package org.eclipse.hono.adapter.mqtt;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.proton.ProtonClientOptions;
import java.nio.charset.Charset;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.HonoClientConfigProperties;
import org.eclipse.hono.client.TelemetrySender;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/VertxBasedMqttProtocolAdapter.class */
public class VertxBasedMqttProtocolAdapter extends AbstractVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(VertxBasedMqttProtocolAdapter.class);
    private static final String CONTENT_TYPE_OCTET_STREAM = "application/octet-stream";
    private static final String NAME = "MQTT Adapter";

    @Value("${hono.mqtt.bindaddress:0.0.0.0}")
    private String bindAddress;

    @Value("${hono.mqtt.listenport:1883}")
    private int listenPort;

    @Autowired
    private HonoClientConfigProperties honoClientConfig;
    private MqttServer server;
    private HonoClient hono;

    private void bindMqttServer(Future<Void> future) {
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(this.bindAddress).setPort(this.listenPort);
        this.server = MqttServer.create(this.vertx, mqttServerOptions);
        this.server.endpointHandler(this::handleEndpointConnection).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("Hono MQTT adapter running on {}:{}", this.bindAddress, Integer.valueOf(this.server.actualPort()));
                future.complete();
            } else {
                LOG.error("error while starting up Hono MQTT adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
    }

    private void connectToHono(Handler<AsyncResult<HonoClient>> handler) {
        this.hono = HonoClient.HonoClientBuilder.newClient().vertx(this.vertx).name(NAME).host(this.honoClientConfig.getHost()).port(this.honoClientConfig.getPort()).user(this.honoClientConfig.getUsername()).password(this.honoClientConfig.getPassword()).build();
        this.hono.connect(new ProtonClientOptions().setReconnectAttempts(-1).setReconnectInterval(200L), asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }

    public void start(Future<Void> future) throws Exception {
        bindMqttServer(future);
        connectToHono(null);
    }

    public void stop(Future<Void> future) throws Exception {
        Future future2 = Future.future();
        future2.setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.info("MQTT adapter has been shut down successfully");
                future.complete();
            } else {
                LOG.info("error while shutting down MQTT adapter", asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        Future future3 = Future.future();
        if (this.server != null) {
            this.server.close(future3.completer());
        } else {
            future3.complete();
        }
        future3.compose(r5 -> {
            if (this.hono != null) {
                this.hono.shutdown(future2.completer());
            } else {
                future2.complete();
            }
        }, future2);
    }

    private void handleEndpointConnection(MqttEndpoint mqttEndpoint) {
        LOG.info("Connection request from client {}", mqttEndpoint.clientIdentifier());
        if (!isConnected()) {
            mqttEndpoint.writeConnack(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, false);
        } else {
            mqttEndpoint.publishHandler(mqttPublishMessage -> {
                LOG.debug("Just received message on [{}] payload [{}] with QoS [{}]", new Object[]{mqttPublishMessage.topicName(), mqttPublishMessage.payload().toString(Charset.defaultCharset()), mqttPublishMessage.qosLevel()});
                ResourceIdentifier fromString = ResourceIdentifier.fromString(mqttPublishMessage.topicName());
                if (fromString.getResourceId() == null) {
                    mqttEndpoint.close();
                } else if (fromString.getResourceId().equals(mqttEndpoint.clientIdentifier())) {
                    this.hono.getOrCreateTelemetrySender(fromString.getTenantId(), asyncResult -> {
                        if (!asyncResult.succeeded()) {
                            mqttEndpoint.close();
                            return;
                        }
                        TelemetrySender telemetrySender = (TelemetrySender) asyncResult.result();
                        if (telemetrySender.sendQueueFull()) {
                            telemetrySender.sendQueueDrainHandler(r9 -> {
                                sendToHono(mqttEndpoint, telemetrySender, mqttPublishMessage);
                            });
                        } else {
                            sendToHono(mqttEndpoint, telemetrySender, mqttPublishMessage);
                        }
                    });
                } else {
                    mqttEndpoint.close();
                }
            });
            mqttEndpoint.writeConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
        }
    }

    private void sendToHono(MqttEndpoint mqttEndpoint, TelemetrySender telemetrySender, MqttPublishMessage mqttPublishMessage) {
        if (telemetrySender.send(mqttEndpoint.clientIdentifier(), mqttPublishMessage.payload().getBytes(), CONTENT_TYPE_OCTET_STREAM) && mqttPublishMessage.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            mqttEndpoint.writePuback(mqttPublishMessage.messageId());
        }
    }

    private boolean isConnected() {
        return this.hono != null && this.hono.isConnected();
    }
}
