package org.eclipse.hono.adapter.mqtt;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.messages.MqttPublishMessage;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.registration.RegistrationAssertionHelperImpl;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/VertxBasedMqttProtocolAdapter.class */
public class VertxBasedMqttProtocolAdapter extends AbstractProtocolAdapterBase<ServiceConfigProperties> {
    private static final Logger LOG = LoggerFactory.getLogger(VertxBasedMqttProtocolAdapter.class);
    private static final String CONTENT_TYPE_OCTET_STREAM = "application/octet-stream";
    private static final String TELEMETRY_ENDPOINT = "telemetry";
    private static final String EVENT_ENDPOINT = "event";
    private static final int IANA_MQTT_PORT = 1883;
    private static final int IANA_SECURE_MQTT_PORT = 8883;
    private MqttServer server;
    private MqttServer insecureServer;
    private Map<MqttEndpoint, String> registrationAssertions = new HashMap();

    public int getPortDefaultValue() {
        return IANA_SECURE_MQTT_PORT;
    }

    public int getInsecurePortDefaultValue() {
        return IANA_MQTT_PORT;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    private Future<MqttServer> bindSecureMqttServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((ServiceConfigProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxMessageSize(((ServiceConfigProperties) getConfig()).getMaxPayloadSize());
        addTlsKeyCertOptions(mqttServerOptions);
        addTlsTrustOptions(mqttServerOptions);
        return bindMqttServer(mqttServerOptions);
    }

    private Future<MqttServer> bindInsecureMqttServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((ServiceConfigProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort()).setMaxMessageSize(((ServiceConfigProperties) getConfig()).getMaxPayloadSize());
        return bindMqttServer(mqttServerOptions);
    }

    private Future<MqttServer> bindMqttServer(MqttServerOptions mqttServerOptions) {
        Future<MqttServer> future = Future.future();
        MqttServer create = MqttServer.create(this.vertx, mqttServerOptions);
        create.endpointHandler(this::handleEndpointConnection).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("Hono MQTT protocol adapter running on {}:{}", ((ServiceConfigProperties) getConfig()).getBindAddress(), Integer.valueOf(create.actualPort()));
                future.complete(create);
            } else {
                LOG.error("error while starting up Hono MQTT adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    public void doStart(Future<Void> future) {
        LOG.info("limiting size of inbound message payload to {} bytes", Integer.valueOf(((ServiceConfigProperties) getConfig()).getMaxPayloadSize()));
        checkPortConfiguration().compose(r3 -> {
            return bindSecureMqttServer();
        }).compose(mqttServer -> {
            this.server = mqttServer;
            return bindInsecureMqttServer();
        }).compose(mqttServer2 -> {
            this.insecureServer = mqttServer2;
            connectToMessaging(null);
            connectToDeviceRegistration(null);
            connectToCredentialsService(null);
            future.complete();
        }, future);
    }

    public void doStop(Future<Void> future) {
        Future future2 = Future.future();
        future2.setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("MQTT adapter has been shut down successfully");
                future.complete();
            } else {
                LOG.info("error while shutting down MQTT adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        Future future3 = Future.future();
        if (this.server != null) {
            this.server.close(future3.completer());
        } else {
            future3.complete();
        }
        Future future4 = Future.future();
        if (this.insecureServer != null) {
            this.insecureServer.close(future4.completer());
        } else {
            future4.complete();
        }
        CompositeFuture.all(future3, future4).compose(compositeFuture -> {
            closeClients(future2.completer());
        }, future2);
    }

    private void handleEndpointConnection(MqttEndpoint mqttEndpoint) {
        LOG.info("connection request from client {}", mqttEndpoint.clientIdentifier());
        if (!isConnected()) {
            mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
            return;
        }
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            LOG.trace("received message [client ID: {}, topic: {}, QoS: {}, payload {}]", new Object[]{mqttEndpoint.clientIdentifier(), mqttPublishMessage.topicName(), mqttPublishMessage.qosLevel(), mqttPublishMessage.payload().toString(Charset.defaultCharset())});
            try {
                ResourceIdentifier fromString = ResourceIdentifier.fromString(mqttPublishMessage.topicName());
                if (fromString.getResourceId() == null) {
                    close(mqttEndpoint);
                } else {
                    Future future = Future.future();
                    future.setHandler(asyncResult -> {
                        if (!asyncResult.failed()) {
                            LOG.trace("successfully processed message [client ID: {}, topic: {}, QoS: {}]", new Object[]{mqttEndpoint.clientIdentifier(), fromString, mqttPublishMessage.qosLevel()});
                        } else {
                            LOG.debug("cannot process message [client ID: {}, topic: {}, QoS: {}]: {}", new Object[]{mqttEndpoint.clientIdentifier(), fromString, mqttPublishMessage.qosLevel(), asyncResult.cause().getMessage()});
                            close(mqttEndpoint);
                        }
                    });
                    if (fromString.getResourceId().equals(mqttEndpoint.clientIdentifier())) {
                        Future<String> registrationAssertion = getRegistrationAssertion(mqttEndpoint, fromString);
                        Future<MessageSender> senderTracker = getSenderTracker(mqttPublishMessage, fromString);
                        CompositeFuture.all(registrationAssertion, senderTracker).compose(compositeFuture -> {
                            doUploadMessage(fromString.getTenantId(), (String) registrationAssertion.result(), mqttEndpoint, mqttPublishMessage, (MessageSender) senderTracker.result(), future);
                        }, future);
                    } else {
                        future.fail("client not authorized");
                    }
                }
            } catch (IllegalArgumentException e) {
                LOG.debug("client [ID: {}] tries to publish on unsupported topic", mqttEndpoint.clientIdentifier());
                close(mqttEndpoint);
            }
        });
        mqttEndpoint.closeHandler(r6 -> {
            LOG.debug("connection closed with client [{}]", mqttEndpoint.clientIdentifier());
            if (this.registrationAssertions.remove(mqttEndpoint) != null) {
                LOG.trace("removed registration assertion for client [{}]", mqttEndpoint.clientIdentifier());
            }
        });
        mqttEndpoint.accept(false);
    }

    private Future<MessageSender> getSenderTracker(MqttPublishMessage mqttPublishMessage, ResourceIdentifier resourceIdentifier) {
        if (resourceIdentifier.getEndpoint().equals(TELEMETRY_ENDPOINT)) {
            return !MqttQoS.AT_MOST_ONCE.equals(mqttPublishMessage.qosLevel()) ? Future.failedFuture("Only QoS 0 supported for telemetry messages") : getTelemetrySender(resourceIdentifier.getTenantId());
        }
        if (resourceIdentifier.getEndpoint().equals(EVENT_ENDPOINT)) {
            return !MqttQoS.AT_LEAST_ONCE.equals(mqttPublishMessage.qosLevel()) ? Future.failedFuture("Only QoS 1 supported for event messages") : getEventSender(resourceIdentifier.getTenantId());
        }
        LOG.debug("no such endpoint [{}]", resourceIdentifier.getEndpoint());
        return Future.failedFuture("no such endpoint");
    }

    private Future<String> getRegistrationAssertion(MqttEndpoint mqttEndpoint, ResourceIdentifier resourceIdentifier) {
        String str = this.registrationAssertions.get(mqttEndpoint);
        if (str != null && !RegistrationAssertionHelperImpl.isExpired(str, 10)) {
            return Future.succeededFuture(str);
        }
        this.registrationAssertions.remove(mqttEndpoint);
        Future<String> future = Future.future();
        getRegistrationAssertion(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId()).compose(str2 -> {
            if (mqttEndpoint.isConnected()) {
                LOG.trace("caching registration assertion for client [{}]", mqttEndpoint.clientIdentifier());
                this.registrationAssertions.put(mqttEndpoint, str2);
            }
            future.complete(str2);
        }, future);
        return future;
    }

    private void close(MqttEndpoint mqttEndpoint) {
        this.registrationAssertions.remove(mqttEndpoint);
        if (!mqttEndpoint.isConnected()) {
            LOG.debug("client has already closed connection");
        } else {
            LOG.debug("closing connection with client [client ID: {}]", mqttEndpoint.clientIdentifier());
            mqttEndpoint.close();
        }
    }

    private void doUploadMessage(String str, String str2, MqttEndpoint mqttEndpoint, MqttPublishMessage mqttPublishMessage, MessageSender messageSender, Future<Void> future) {
        if (messageSender.send(mqttEndpoint.clientIdentifier(), mqttPublishMessage.payload().getBytes(), CONTENT_TYPE_OCTET_STREAM, str2, (obj, protonDelivery) -> {
            LOG.trace("delivery state updated [message ID: {}, new remote state: {}]", obj, protonDelivery.getRemoteState());
            if (mqttPublishMessage.qosLevel() == MqttQoS.AT_LEAST_ONCE && mqttEndpoint.isConnected()) {
                if (!Accepted.class.isInstance(protonDelivery.getRemoteState())) {
                    future.fail("message not accepted by remote");
                } else {
                    mqttEndpoint.publishAcknowledge(mqttPublishMessage.messageId());
                    future.complete();
                }
            }
        })) {
            return;
        }
        future.fail("no credit available for sending message");
    }
}
