package org.eclipse.hono.adapter.mqtt;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.messages.MqttPublishMessage;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.service.auth.device.DeviceCredentials;
import org.eclipse.hono.service.auth.device.UsernamePasswordCredentials;
import org.eclipse.hono.service.registration.RegistrationAssertionHelperImpl;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/VertxBasedMqttProtocolAdapter.class */
public class VertxBasedMqttProtocolAdapter extends AbstractProtocolAdapterBase<ProtocolAdapterProperties> {
    private static final Logger LOG = LoggerFactory.getLogger(VertxBasedMqttProtocolAdapter.class);
    private static final String CONTENT_TYPE_OCTET_STREAM = "application/octet-stream";
    private static final String TELEMETRY_ENDPOINT = "telemetry";
    private static final String EVENT_ENDPOINT = "event";
    private static final int IANA_MQTT_PORT = 1883;
    private static final int IANA_SECURE_MQTT_PORT = 8883;
    private MqttServer server;
    private MqttServer insecureServer;
    private Map<MqttEndpoint, String> registrationAssertions = new HashMap();
    private MqttAdapterMetrics metrics;

    @Autowired
    public final void setMetrics(MqttAdapterMetrics mqttAdapterMetrics) {
        this.metrics = mqttAdapterMetrics;
    }

    public int getPortDefaultValue() {
        return IANA_SECURE_MQTT_PORT;
    }

    public int getInsecurePortDefaultValue() {
        return IANA_MQTT_PORT;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    public void setMqttSecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalArgumentException("MQTT server must not be started already");
        }
        this.server = mqttServer;
    }

    public void setMqttInsecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalArgumentException("MQTT server must not be started already");
        }
        this.insecureServer = mqttServer;
    }

    private Future<MqttServer> bindSecureMqttServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((ProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxMessageSize(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize());
        addTlsKeyCertOptions(mqttServerOptions);
        addTlsTrustOptions(mqttServerOptions);
        Future<MqttServer> future = Future.future();
        future.setHandler(asyncResult -> {
            this.server = (MqttServer) asyncResult.result();
        });
        bindMqttServer(mqttServerOptions, this.server, future);
        return future;
    }

    private Future<MqttServer> bindInsecureMqttServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((ProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort()).setMaxMessageSize(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize());
        Future<MqttServer> future = Future.future();
        future.setHandler(asyncResult -> {
            this.insecureServer = (MqttServer) asyncResult.result();
        });
        bindMqttServer(mqttServerOptions, this.insecureServer, future);
        return future;
    }

    private void bindMqttServer(MqttServerOptions mqttServerOptions, MqttServer mqttServer, Future<MqttServer> future) {
        MqttServer create = mqttServer == null ? MqttServer.create(this.vertx, mqttServerOptions) : mqttServer;
        create.endpointHandler(this::handleEndpointConnection).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("Hono MQTT protocol adapter running on {}:{}", ((ProtocolAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(create.actualPort()));
                future.complete(create);
            } else {
                LOG.error("error while starting up Hono MQTT adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
    }

    public void doStart(Future<Void> future) {
        LOG.info("limiting size of inbound message payload to {} bytes", Integer.valueOf(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        if (!((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            LOG.warn("authentication of devices switched off");
        }
        checkPortConfiguration().compose(r3 -> {
            return bindSecureMqttServer();
        }).compose(mqttServer -> {
            return bindInsecureMqttServer();
        }).compose(mqttServer2 -> {
            connectToMessaging(null);
            connectToDeviceRegistration(null);
            future.complete();
        }, future);
    }

    public void doStop(Future<Void> future) {
        Future future2 = Future.future();
        future2.setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("MQTT adapter has been shut down successfully");
                future.complete();
            } else {
                LOG.info("error while shutting down MQTT adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        Future future3 = Future.future();
        if (this.server != null) {
            this.server.close(future3.completer());
        } else {
            future3.complete();
        }
        Future future4 = Future.future();
        if (this.insecureServer != null) {
            this.insecureServer.close(future4.completer());
        } else {
            future4.complete();
        }
        CompositeFuture.all(future3, future4).compose(compositeFuture -> {
            closeClients(future2.completer());
        }, future2);
    }

    void handleEndpointConnection(MqttEndpoint mqttEndpoint) {
        LOG.debug("connection request from client [clientId: {}]", mqttEndpoint.clientIdentifier());
        if (!isConnected()) {
            mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
            LOG.debug("connection request from client [clientId: {}] rejected: {}", mqttEndpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
        } else if (((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            handleEndpointConnectionWithAuthentication(mqttEndpoint);
        } else {
            handleEndpointConnectionWithoutAuthentication(mqttEndpoint);
        }
    }

    private void handleEndpointConnectionWithoutAuthentication(MqttEndpoint mqttEndpoint) {
        mqttEndpoint.closeHandler(r6 -> {
            LOG.debug("connection closed with client [clientId: {}]", mqttEndpoint.clientIdentifier());
            if (this.registrationAssertions.remove(mqttEndpoint) != null) {
                LOG.trace("removed registration assertion for client [clientId: {}]", mqttEndpoint.clientIdentifier());
            }
        });
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            ResourceIdentifier fromString = ResourceIdentifier.fromString(mqttPublishMessage.topicName());
            if (fromString.getResourceId() == null) {
                close(mqttEndpoint);
            } else if (fromString.getTenantId() == null) {
                close(mqttEndpoint);
            } else {
                publishMessage(mqttEndpoint, fromString.getTenantId(), fromString.getResourceId(), mqttPublishMessage, fromString);
            }
        });
        LOG.debug("successfully connected with client [clientId: {}]", mqttEndpoint.clientIdentifier());
        mqttEndpoint.accept(false);
    }

    private void handleEndpointConnectionWithAuthentication(MqttEndpoint mqttEndpoint) {
        if (mqttEndpoint.auth() == null) {
            LOG.trace("no auth information in endpoint found");
            mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            LOG.debug("connection request from client [clientId: {}] rejected {}", mqttEndpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        } else {
            UsernamePasswordCredentials create = UsernamePasswordCredentials.create(mqttEndpoint.auth().userName(), mqttEndpoint.auth().password(), ((ProtocolAdapterProperties) getConfig()).isSingleTenant());
            if (create != null) {
                getCredentialsAuthProvider().authenticate(create, asyncResult -> {
                    handleCredentialsResult(asyncResult, mqttEndpoint, create);
                });
            } else {
                mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
                LOG.debug("connection request from client [clientId: {}] rejected [cause: {}]", mqttEndpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            }
        }
    }

    private void handleCredentialsResult(AsyncResult<Device> asyncResult, MqttEndpoint mqttEndpoint, DeviceCredentials deviceCredentials) {
        if (!asyncResult.succeeded()) {
            LOG.debug("authentication failed for device [tenant-id: {}, auth-id: {}, cause: {}]", new Object[]{deviceCredentials.getTenantId(), deviceCredentials.getAuthId(), asyncResult.cause().getMessage()});
            mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
            return;
        }
        String deviceId = ((Device) asyncResult.result()).getDeviceId();
        LOG.debug("successfully authenticated device [tenant-id: {}, auth-id: {}, device-id: {}]", new Object[]{deviceCredentials.getTenantId(), deviceCredentials.getAuthId(), deviceId});
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            try {
                ResourceIdentifier fromString = ResourceIdentifier.fromString(mqttPublishMessage.topicName());
                if (validateCredentialsWithTopicStructure(fromString, deviceCredentials.getTenantId(), deviceId)) {
                    publishMessage(mqttEndpoint, deviceCredentials.getTenantId(), deviceId, mqttPublishMessage, fromString);
                } else {
                    mqttEndpoint.close();
                }
            } catch (IllegalArgumentException e) {
                LOG.debug("discarding message with malformed topic ...");
            }
        });
        mqttEndpoint.closeHandler(r11 -> {
            LOG.debug("connection closed to device [tenant-id: {}, auth-id: {}, device-id: {}]", new Object[]{deviceCredentials.getTenantId(), deviceCredentials.getAuthId(), deviceId});
            if (this.registrationAssertions.remove(mqttEndpoint) != null) {
                LOG.trace("removed registration assertion for device [tenant-id: {}, auth-id: {}, device-id: {}]", new Object[]{deviceCredentials.getTenantId(), deviceCredentials.getAuthId(), deviceId});
            }
        });
        mqttEndpoint.accept(false);
    }

    private void publishMessage(MqttEndpoint mqttEndpoint, String str, String str2, MqttPublishMessage mqttPublishMessage, ResourceIdentifier resourceIdentifier) {
        LOG.trace("received message for device [tenantId: {}, deviceId: {}, topic: {}, QoS: {}]", new Object[]{str, str2, mqttPublishMessage.topicName(), mqttPublishMessage.qosLevel()});
        try {
            Future<String> registrationAssertion = getRegistrationAssertion(mqttEndpoint, str, str2);
            Future<MessageSender> senderTracker = getSenderTracker(mqttPublishMessage, resourceIdentifier, str);
            Future future = Future.future();
            future.setHandler(asyncResult -> {
                if (!asyncResult.failed()) {
                    LOG.trace("successfully processed message for device [tenantId: {}, deviceId: {}, topic: {}, QoS: {}]", new Object[]{str, str2, resourceIdentifier, mqttPublishMessage.qosLevel()});
                    this.metrics.incrementProcessedMqttMessages(resourceIdentifier.getEndpoint(), str);
                } else {
                    LOG.debug("cannot process message for device [tenantId: {}, deviceId: {}, topic: {}, QoS: {}, cause: {}]", new Object[]{str, str2, resourceIdentifier, mqttPublishMessage.qosLevel(), asyncResult.cause().getMessage()});
                    if (ClientErrorException.class.isInstance(asyncResult.cause())) {
                        return;
                    }
                    this.metrics.incrementUndeliverableMqttMessages(resourceIdentifier.getEndpoint(), str);
                }
            });
            CompositeFuture.all(registrationAssertion, senderTracker).compose(compositeFuture -> {
                doUploadMessage(str2, (String) registrationAssertion.result(), mqttEndpoint, mqttPublishMessage, (MessageSender) senderTracker.result(), future);
            }, future);
        } catch (IllegalArgumentException e) {
            LOG.debug("client for [tenantId: {}, deviceId: {}] tries to publish on unsupported topic", str, str2);
            close(mqttEndpoint);
        }
    }

    private Future<MessageSender> getSenderTracker(MqttPublishMessage mqttPublishMessage, ResourceIdentifier resourceIdentifier, String str) {
        if (resourceIdentifier.getEndpoint().equals(TELEMETRY_ENDPOINT)) {
            return !MqttQoS.AT_MOST_ONCE.equals(mqttPublishMessage.qosLevel()) ? Future.failedFuture("Only QoS 0 supported for telemetry messages") : getTelemetrySender(str);
        }
        if (resourceIdentifier.getEndpoint().equals(EVENT_ENDPOINT)) {
            return !MqttQoS.AT_LEAST_ONCE.equals(mqttPublishMessage.qosLevel()) ? Future.failedFuture("Only QoS 1 supported for event messages") : getEventSender(str);
        }
        LOG.debug("no such endpoint [{}]", resourceIdentifier.getEndpoint());
        return Future.failedFuture("no such endpoint");
    }

    private Future<String> getRegistrationAssertion(MqttEndpoint mqttEndpoint, String str, String str2) {
        String str3 = this.registrationAssertions.get(mqttEndpoint);
        if (str3 != null && !RegistrationAssertionHelperImpl.isExpired(str3, 10)) {
            return Future.succeededFuture(str3);
        }
        this.registrationAssertions.remove(mqttEndpoint);
        return getRegistrationAssertion(str, str2).map(str4 -> {
            if (mqttEndpoint.isConnected()) {
                LOG.trace("caching registration assertion for [tenantId: {}, deviceId: {}]", str, str2);
                this.registrationAssertions.put(mqttEndpoint, str4);
            }
            return str4;
        });
    }

    private void close(MqttEndpoint mqttEndpoint) {
        this.registrationAssertions.remove(mqttEndpoint);
        if (!mqttEndpoint.isConnected()) {
            LOG.debug("client has already closed connection");
        } else {
            LOG.debug("closing connection with client [client ID: {}]", mqttEndpoint.clientIdentifier());
            mqttEndpoint.close();
        }
    }

    void doUploadMessage(String str, String str2, MqttEndpoint mqttEndpoint, MqttPublishMessage mqttPublishMessage, MessageSender messageSender, Future<Void> future) {
        if (messageSender.send(str, mqttPublishMessage.payload().getBytes(), CONTENT_TYPE_OCTET_STREAM, str2, (obj, protonDelivery) -> {
            LOG.trace("delivery state updated [message ID: {}, new remote state: {}]", obj, protonDelivery.getRemoteState());
            if (mqttPublishMessage.qosLevel() == MqttQoS.AT_MOST_ONCE) {
                future.complete();
                return;
            }
            if (mqttPublishMessage.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                if (!Accepted.class.isInstance(protonDelivery.getRemoteState())) {
                    future.fail("message not accepted by remote");
                    return;
                }
                if (mqttEndpoint.isConnected()) {
                    mqttEndpoint.publishAcknowledge(mqttPublishMessage.messageId());
                }
                future.complete();
            }
        })) {
            return;
        }
        future.fail("no credit available for sending message");
    }
}
