package org.eclipse.hono.client;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.impl.RegistrationClientImpl;
import org.eclipse.hono.client.impl.TelemetryConsumerImpl;
import org.eclipse.hono.client.impl.TelemetrySenderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/HonoClient.class */
public class HonoClient {
    private static final Logger LOG = LoggerFactory.getLogger(HonoClient.class);
    private final String name;
    private final String host;
    private final int port;
    private final String pathSeparator;
    private final Map<String, TelemetrySender> activeSenders;
    private final Map<String, RegistrationClient> activeRegClients;
    private final String user;
    private final String password;
    private ProtonClientOptions clientOptions;
    private ProtonConnection connection;
    private AtomicBoolean connecting;
    private Vertx vertx;
    private Context context;

    /* loaded from: input_file:org/eclipse/hono/client/HonoClient$HonoClientBuilder.class */
    public static class HonoClientBuilder {
        private String name;
        private Vertx vertx;
        private String host;
        private int port;
        private String user;
        private String password;
        private String pathSeparator;

        public static HonoClientBuilder newClient() {
            return new HonoClientBuilder();
        }

        public static HonoClientBuilder newClient(HonoClientConfigProperties honoClientConfigProperties) {
            HonoClientBuilder honoClientBuilder = new HonoClientBuilder();
            honoClientBuilder.name(honoClientConfigProperties.getName()).host(honoClientConfigProperties.getHost()).port(honoClientConfigProperties.getPort()).user(honoClientConfigProperties.getUsername()).password(honoClientConfigProperties.getPassword()).pathSeparator(honoClientConfigProperties.getPathSeparator());
            return honoClientBuilder;
        }

        public HonoClientBuilder name(String str) {
            this.name = str;
            return this;
        }

        public HonoClientBuilder vertx(Vertx vertx) {
            this.vertx = vertx;
            return this;
        }

        public HonoClientBuilder host(String str) {
            this.host = str;
            return this;
        }

        public HonoClientBuilder port(int i) {
            this.port = i;
            return this;
        }

        public HonoClientBuilder user(String str) {
            this.user = str;
            return this;
        }

        public HonoClientBuilder password(String str) {
            this.password = str;
            return this;
        }

        public HonoClientBuilder pathSeparator(String str) {
            this.pathSeparator = str;
            return this;
        }

        public HonoClient build() {
            return new HonoClient(this);
        }
    }

    public HonoClient(Vertx vertx, HonoClientConfigProperties honoClientConfigProperties) {
        this(HonoClientBuilder.newClient(honoClientConfigProperties).vertx(vertx));
    }

    private HonoClient(HonoClientBuilder honoClientBuilder) {
        this.activeSenders = new ConcurrentHashMap();
        this.activeRegClients = new ConcurrentHashMap();
        this.connecting = new AtomicBoolean(false);
        if (honoClientBuilder.vertx != null) {
            this.vertx = honoClientBuilder.vertx;
        } else {
            this.vertx = Vertx.vertx();
        }
        if (honoClientBuilder.name != null) {
            this.name = honoClientBuilder.name;
        } else {
            this.name = String.format("Hono-Client-%s", UUID.randomUUID().toString());
        }
        this.host = (String) Objects.requireNonNull(honoClientBuilder.host);
        this.port = honoClientBuilder.port;
        this.user = honoClientBuilder.user;
        this.password = honoClientBuilder.password;
        this.pathSeparator = honoClientBuilder.pathSeparator == null ? "/" : honoClientBuilder.pathSeparator;
    }

    public boolean isConnected() {
        return (this.connection == null || this.connection.isDisconnected()) ? false : true;
    }

    public HonoClient connect(ProtonClientOptions protonClientOptions, Handler<AsyncResult<HonoClient>> handler) {
        return connect(protonClientOptions, handler, null);
    }

    public HonoClient connect(ProtonClientOptions protonClientOptions, Handler<AsyncResult<HonoClient>> handler, Handler<ProtonConnection> handler2) {
        Objects.requireNonNull(handler);
        if (isConnected()) {
            LOG.debug("already connected to server [{}:{}]", this.host, Integer.valueOf(this.port));
            handler.handle(Future.succeededFuture(this));
        } else if (this.connecting.compareAndSet(false, true)) {
            this.connection = null;
            if (protonClientOptions == null) {
                this.clientOptions = new ProtonClientOptions();
            } else {
                this.clientOptions = protonClientOptions;
            }
            LOG.debug("connecting to server [{}:{}] as user [{}]...", new Object[]{this.host, Integer.valueOf(this.port), this.user});
            ProtonClient.create(this.vertx).connect(this.clientOptions, this.host, this.port, this.user, this.password, asyncResult -> {
                if (asyncResult.succeeded()) {
                    LOG.info("connected to server [{}:{}]", this.host, Integer.valueOf(this.port));
                    ((ProtonConnection) asyncResult.result()).setHostname("hono").setContainer(this.name).openHandler(asyncResult -> {
                        this.connecting.compareAndSet(true, false);
                        if (!asyncResult.succeeded()) {
                            LOG.warn("cannot open connection to container [{}:{}]", new Object[]{this.host, Integer.valueOf(this.port), asyncResult.cause()});
                            handler.handle(Future.failedFuture(asyncResult.cause()));
                            return;
                        }
                        LOG.info("connection to [{}] open", ((ProtonConnection) asyncResult.result()).getRemoteContainer());
                        this.connection = (ProtonConnection) asyncResult.result();
                        this.context = Vertx.currentContext();
                        if (handler2 != null) {
                            this.connection.disconnectHandler(handler2);
                        } else {
                            this.connection.disconnectHandler(this::onRemoteDisconnect);
                        }
                        handler.handle(Future.succeededFuture(this));
                    }).open();
                } else {
                    LOG.warn("connection to server [{}:{}] failed", new Object[]{this.host, Integer.valueOf(this.port), asyncResult.cause()});
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                }
            });
        } else {
            LOG.debug("already trying to connect to Hono server ...");
        }
        return this;
    }

    private void onRemoteDisconnect(ProtonConnection protonConnection) {
        LOG.warn("lost connection to Hono server [{}:{}]", this.host, Integer.valueOf(this.port));
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.disconnect();
        this.activeSenders.clear();
        this.activeRegClients.clear();
        if (this.clientOptions.getReconnectAttempts() != 0) {
            this.vertx.setTimer(300L, l -> {
                LOG.info("attempting to re-connect to Hono server [{}:{}]", this.host, Integer.valueOf(this.port));
                connect(this.clientOptions, asyncResult -> {
                });
            });
        }
    }

    public HonoClient getOrCreateTelemetrySender(String str, Handler<AsyncResult<TelemetrySender>> handler) {
        TelemetrySender telemetrySender = this.activeSenders.get(Objects.requireNonNull(str));
        if (telemetrySender != null) {
            handler.handle(Future.succeededFuture(telemetrySender));
        } else {
            createTelemetrySender(str, handler);
        }
        return this;
    }

    public HonoClient createTelemetrySender(String str, Handler<AsyncResult<TelemetrySender>> handler) {
        Objects.requireNonNull(str);
        if (this.connection == null || this.connection.isDisconnected()) {
            handler.handle(Future.failedFuture("client is not connected to Hono (yet)"));
        } else {
            TelemetrySenderImpl.create(this.context, this.connection, str, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                } else {
                    this.activeSenders.put(str, asyncResult.result());
                    handler.handle(Future.succeededFuture(asyncResult.result()));
                }
            });
        }
        return this;
    }

    public HonoClient createTelemetryConsumer(String str, Consumer<Message> consumer, Handler<AsyncResult<TelemetryConsumer>> handler) {
        Objects.requireNonNull(str);
        if (this.connection == null || this.connection.isDisconnected()) {
            handler.handle(Future.failedFuture("client is not connected to Hono (yet)"));
        } else {
            TelemetryConsumerImpl.create(this.context, this.connection, str, this.pathSeparator, consumer, handler);
        }
        return this;
    }

    public HonoClient getOrCreateRegistrationClient(String str, Handler<AsyncResult<RegistrationClient>> handler) {
        RegistrationClient registrationClient = this.activeRegClients.get(Objects.requireNonNull(str));
        if (registrationClient != null) {
            handler.handle(Future.succeededFuture(registrationClient));
        } else {
            createRegistrationClient(str, handler);
        }
        return this;
    }

    public HonoClient createRegistrationClient(String str, Handler<AsyncResult<RegistrationClient>> handler) {
        Objects.requireNonNull(str);
        if (this.connection == null || this.connection.isDisconnected()) {
            handler.handle(Future.failedFuture("client is not connected to Hono (yet)"));
        } else {
            RegistrationClientImpl.create(this.context, this.connection, str, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                } else {
                    this.activeRegClients.put(str, asyncResult.result());
                    handler.handle(Future.succeededFuture(asyncResult.result()));
                }
            });
        }
        return this;
    }

    public void shutdown() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        shutdown(asyncResult -> {
            if (asyncResult.succeeded()) {
                countDownLatch.countDown();
            } else {
                LOG.error("could not close connection to server", asyncResult.cause());
            }
        });
        try {
            if (!countDownLatch.await(5L, TimeUnit.SECONDS)) {
                LOG.error("shutdown of client timed out");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void shutdown(Handler<AsyncResult<Void>> handler) {
        if (this.connection == null || this.connection.isDisconnected()) {
            LOG.info("connection to server [{}:{}] already closed", this.host, Integer.valueOf(this.port));
            handler.handle(Future.succeededFuture());
        } else {
            LOG.info("closing connection to server [{}:{}]...", this.host, Integer.valueOf(this.port));
            this.connection.disconnectHandler((Handler) null);
            this.connection.closeHandler(asyncResult -> {
                if (asyncResult.succeeded()) {
                    LOG.info("closed connection to server [{}:{}]", this.host, Integer.valueOf(this.port));
                } else {
                    LOG.info("could not close connection to server [{}:{}]", new Object[]{this.host, Integer.valueOf(this.port), asyncResult.cause()});
                }
                this.connection.disconnect();
                if (handler != null) {
                    handler.handle(Future.succeededFuture());
                }
            }).close();
        }
    }
}
