package org.eclipse.hono.adapter.mqtt;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import java.util.Objects;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.service.auth.device.DeviceCredentials;
import org.eclipse.hono.service.auth.device.UsernamePasswordCredentials;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.class */
public abstract class AbstractVertxBasedMqttProtocolAdapter<T extends ProtocolAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    private static final int IANA_MQTT_PORT = 1883;
    private static final int IANA_SECURE_MQTT_PORT = 8883;
    protected final Logger LOG = LoggerFactory.getLogger(getClass());
    private MqttAdapterMetrics metrics;
    private MqttServer server;
    private MqttServer insecureServer;

    public int getPortDefaultValue() {
        return IANA_SECURE_MQTT_PORT;
    }

    public int getInsecurePortDefaultValue() {
        return IANA_MQTT_PORT;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    @Autowired
    public final void setMetrics(MqttAdapterMetrics mqttAdapterMetrics) {
        this.metrics = mqttAdapterMetrics;
    }

    public void setMqttSecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalArgumentException("MQTT server must not be started already");
        }
        this.server = mqttServer;
    }

    public void setMqttInsecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalArgumentException("MQTT server must not be started already");
        }
        this.insecureServer = mqttServer;
    }

    private Future<Void> bindSecureMqttServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((ProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxMessageSize(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize());
        addTlsKeyCertOptions(mqttServerOptions);
        addTlsTrustOptions(mqttServerOptions);
        return bindMqttServer(mqttServerOptions, this.server).map(mqttServer -> {
            this.server = mqttServer;
            return (Void) null;
        }).recover(th -> {
            return Future.failedFuture(th);
        });
    }

    private Future<Void> bindInsecureMqttServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((ProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort()).setMaxMessageSize(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize());
        return bindMqttServer(mqttServerOptions, this.insecureServer).map(mqttServer -> {
            this.insecureServer = mqttServer;
            return (Void) null;
        }).recover(th -> {
            return Future.failedFuture(th);
        });
    }

    private Future<MqttServer> bindMqttServer(MqttServerOptions mqttServerOptions, MqttServer mqttServer) {
        Future<MqttServer> future = Future.future();
        MqttServer create = mqttServer == null ? MqttServer.create(this.vertx, mqttServerOptions) : mqttServer;
        create.endpointHandler(this::handleEndpointConnection).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.LOG.info("MQTT server running on {}:{}", ((ProtocolAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(create.actualPort()));
                future.complete(create);
            } else {
                this.LOG.error("error while starting up MQTT server", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    public void doStart(Future<Void> future) {
        this.LOG.info("limiting size of inbound message payload to {} bytes", Integer.valueOf(((ProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        if (!((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            this.LOG.warn("authentication of devices turned off");
        }
        checkPortConfiguration().compose(r3 -> {
            return bindSecureMqttServer();
        }).compose(r32 -> {
            return bindInsecureMqttServer();
        }).compose(r33 -> {
            future.complete();
        }, future);
    }

    public void doStop(Future<Void> future) {
        Future future2 = Future.future();
        if (this.server != null) {
            this.server.close(future2.completer());
        } else {
            future2.complete();
        }
        Future future3 = Future.future();
        if (this.insecureServer != null) {
            this.insecureServer.close(future3.completer());
        } else {
            future3.complete();
        }
        CompositeFuture.all(future2, future3).compose(compositeFuture -> {
            future.complete();
        }, future);
    }

    final void handleEndpointConnection(MqttEndpoint mqttEndpoint) {
        this.LOG.debug("connection request from client [clientId: {}]", mqttEndpoint.clientIdentifier());
        isConnected().map(bool -> {
            if (!bool.booleanValue()) {
                mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                this.LOG.debug("connection request from client [clientId: {}] rejected: {}", mqttEndpoint.clientIdentifier(), MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                return null;
            }
            if (((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
                handleEndpointConnectionWithAuthentication(mqttEndpoint);
                return null;
            }
            handleEndpointConnectionWithoutAuthentication(mqttEndpoint);
            return null;
        });
    }

    private void handleEndpointConnectionWithoutAuthentication(MqttEndpoint mqttEndpoint) {
        mqttEndpoint.closeHandler(r6 -> {
            close(mqttEndpoint);
            this.LOG.debug("connection to unauthenticated device [clientId: {}] closed", mqttEndpoint.clientIdentifier());
        });
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            onPublishedMessage(new MqttContext(mqttPublishMessage, mqttEndpoint));
        });
        this.LOG.debug("unauthenticated device [clientId: {}] connected", mqttEndpoint.clientIdentifier());
        mqttEndpoint.accept(false);
    }

    private void handleEndpointConnectionWithAuthentication(MqttEndpoint mqttEndpoint) {
        if (mqttEndpoint.auth() == null) {
            this.LOG.debug("connection request from device [clientId: {}] rejected: {}", mqttEndpoint.clientIdentifier(), "device did not provide credentials in CONNECT packet");
            mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            return;
        }
        DeviceCredentials credentials = getCredentials(mqttEndpoint.auth());
        if (credentials != null) {
            getCredentialsAuthProvider().authenticate(credentials, asyncResult -> {
                if (!asyncResult.failed()) {
                    Device device = (Device) asyncResult.result();
                    this.LOG.debug("successfully authenticated device [tenant-id: {}, auth-id: {}, device-id: {}]", new Object[]{device.getTenantId(), credentials.getAuthId(), device.getDeviceId()});
                    onAuthenticationSuccess(mqttEndpoint, device);
                } else {
                    this.LOG.debug("cannot authenticate device [tenant-id: {}, auth-id: {}]: {}", new Object[]{credentials.getTenantId(), credentials.getAuthId(), asyncResult.cause().getMessage()});
                    if (ServerErrorException.class.isInstance(asyncResult.cause())) {
                        mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                    } else {
                        mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
                    }
                }
            });
        } else {
            this.LOG.debug("connection request from device [clientId: {}] rejected: {}", mqttEndpoint.clientIdentifier(), "device provided malformed credentials in CONNECT packet");
            mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        }
    }

    private void onAuthenticationSuccess(MqttEndpoint mqttEndpoint, Device device) {
        mqttEndpoint.closeHandler(r8 -> {
            close(mqttEndpoint);
            this.LOG.debug("connection to device [tenant-id: {}, device-id: {}] closed", device.getTenantId(), device.getDeviceId());
            this.metrics.decrementMqttConnections(device.getTenantId());
        });
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            onPublishedMessage(new MqttContext(mqttPublishMessage, mqttEndpoint, device));
        });
        mqttEndpoint.accept(false);
        this.metrics.incrementMqttConnections(device.getTenantId());
    }

    public final Future<Void> uploadMessage(MqttContext mqttContext, ResourceIdentifier resourceIdentifier, Buffer buffer) {
        Objects.requireNonNull(mqttContext);
        Objects.requireNonNull(resourceIdentifier);
        Objects.requireNonNull(buffer);
        return resourceIdentifier.getEndpoint().equals("telemetry") ? uploadTelemetryMessage(mqttContext, resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), buffer) : resourceIdentifier.getEndpoint().equals("event") ? uploadEventMessage(mqttContext, resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), buffer) : Future.failedFuture(new ClientErrorException(400, "unsupported endpoint"));
    }

    public final Future<Void> uploadTelemetryMessage(MqttContext mqttContext, String str, String str2, Buffer buffer) {
        return uploadMessage((MqttContext) Objects.requireNonNull(mqttContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), (Buffer) Objects.requireNonNull(buffer), getTelemetrySender(str), "telemetry");
    }

    public final Future<Void> uploadEventMessage(MqttContext mqttContext, String str, String str2, Buffer buffer) {
        return uploadMessage((MqttContext) Objects.requireNonNull(mqttContext), (String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), (Buffer) Objects.requireNonNull(buffer), getEventSender(str), "event");
    }

    private Future<Void> uploadMessage(MqttContext mqttContext, String str, String str2, Buffer buffer, Future<MessageSender> future, String str3) {
        if (buffer.length() == 0) {
            return Future.failedFuture(new ClientErrorException(400, "payload must not be empty"));
        }
        Future registrationAssertion = getRegistrationAssertion(str, str2, mqttContext.authenticatedDevice());
        return CompositeFuture.all(registrationAssertion, future).compose(compositeFuture -> {
            Message newMessage = newMessage(String.format("%s/%s", str3, str), str2, mqttContext.message().topicName(), mqttContext.contentType(), buffer, (JsonObject) registrationAssertion.result());
            customizeDownstreamMessage(newMessage, mqttContext);
            return ((MessageSender) future.result()).send(newMessage);
        }).compose(protonDelivery -> {
            this.LOG.trace("successfully processed message [topic: {}, QoS: {}] for device [tenantId: {}, deviceId: {}]", new Object[]{mqttContext.message().topicName(), mqttContext.message().qosLevel(), str, str2});
            this.metrics.incrementProcessedMqttMessages(str3, str);
            onMessageSent(mqttContext);
            if ("event".equals(str3) && mqttContext.deviceEndpoint().isConnected() && mqttContext.message().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                mqttContext.deviceEndpoint().publishAcknowledge(mqttContext.message().messageId());
            }
            return Future.succeededFuture();
        }).recover(th -> {
            if (ClientErrorException.class.isInstance(th)) {
                ClientErrorException clientErrorException = (ClientErrorException) th;
                this.LOG.debug("cannot process message for device [tenantId: {}, deviceId: {}, endpoint: {}]: {} - {}", new Object[]{str, str2, str3, Integer.valueOf(clientErrorException.getErrorCode()), clientErrorException.getMessage()});
            } else {
                this.LOG.debug("cannot process message for device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{str, str2, str3, th});
                this.metrics.incrementUndeliverableMqttMessages(str3, str);
                onMessageUndeliverable(mqttContext);
            }
            return Future.failedFuture(th);
        });
    }

    protected final void close(MqttEndpoint mqttEndpoint) {
        onClose(mqttEndpoint);
        if (!mqttEndpoint.isConnected()) {
            this.LOG.trace("client has already closed connection");
        } else {
            this.LOG.debug("closing connection with client [client ID: {}]", mqttEndpoint.clientIdentifier());
            mqttEndpoint.close();
        }
    }

    protected void onClose(MqttEndpoint mqttEndpoint) {
    }

    protected DeviceCredentials getCredentials(MqttAuth mqttAuth) {
        return UsernamePasswordCredentials.create(mqttAuth.userName(), mqttAuth.password(), ((ProtocolAdapterProperties) getConfig()).isSingleTenant());
    }

    protected abstract Future<Void> onPublishedMessage(MqttContext mqttContext);

    protected void customizeDownstreamMessage(Message message, MqttContext mqttContext) {
    }

    protected void onMessageSent(MqttContext mqttContext) {
    }

    protected void onMessageUndeliverable(MqttContext mqttContext) {
    }
}
