package org.eclipse.hono.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.RegistrationClient;
import org.eclipse.hono.connection.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/impl/HonoClientImpl.class */
public final class HonoClientImpl implements HonoClient {
    private static final Logger LOG = LoggerFactory.getLogger(HonoClientImpl.class);
    private final Map<String, MessageSender> activeSenders = new ConcurrentHashMap();
    private final Map<String, RegistrationClient> activeRegClients = new ConcurrentHashMap();
    private final Map<String, Boolean> senderCreationLocks = new ConcurrentHashMap();
    private final AtomicBoolean connecting = new AtomicBoolean(false);
    private ProtonClientOptions clientOptions;
    private ProtonConnection connection;
    private Vertx vertx;
    private Context context;
    private ConnectionFactory connectionFactory;

    public HonoClientImpl(Vertx vertx, ConnectionFactory connectionFactory) {
        if (vertx != null) {
            this.vertx = vertx;
        } else {
            this.vertx = Vertx.vertx();
        }
        this.connectionFactory = connectionFactory;
    }

    void setConnection(ProtonConnection protonConnection) {
        this.connection = protonConnection;
    }

    void setContext(Context context) {
        this.context = context;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public boolean isConnected() {
        return (this.connection == null || this.connection.isDisconnected()) ? false : true;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public Map<String, Object> getConnectionStatus() {
        HashMap hashMap = new HashMap();
        hashMap.put("name", this.connectionFactory.getName());
        hashMap.put("connected", Boolean.valueOf(isConnected()));
        hashMap.put("Hono server", String.format("%s:%d", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort())));
        hashMap.put("#regClients", Integer.valueOf(this.activeRegClients.size()));
        hashMap.put("senders", getSenderStatus());
        return hashMap;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public JsonArray getSenderStatus() {
        JsonArray jsonArray = new JsonArray();
        for (Map.Entry<String, MessageSender> entry : this.activeSenders.entrySet()) {
            MessageSender value = entry.getValue();
            jsonArray.add(new JsonObject().put("address", entry.getKey()).put("open", Boolean.valueOf(value.isOpen())).put("credit", Integer.valueOf(value.getCredit())));
        }
        return jsonArray;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient connect(ProtonClientOptions protonClientOptions, Handler<AsyncResult<HonoClient>> handler) {
        return connect(protonClientOptions, handler, null);
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient connect(ProtonClientOptions protonClientOptions, Handler<AsyncResult<HonoClient>> handler, Handler<ProtonConnection> handler2) {
        Objects.requireNonNull(handler);
        if (isConnected()) {
            LOG.debug("already connected to server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            handler.handle(Future.succeededFuture(this));
        } else if (this.connecting.compareAndSet(false, true)) {
            setConnection(null);
            if (protonClientOptions == null) {
                this.clientOptions = new ProtonClientOptions();
            } else {
                this.clientOptions = protonClientOptions;
            }
            this.connectionFactory.connect(this.clientOptions, this::onRemoteClose, handler2 != null ? handler2 : this::onRemoteDisconnect, asyncResult -> {
                this.connecting.compareAndSet(true, false);
                if (asyncResult.failed()) {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                    return;
                }
                setConnection((ProtonConnection) asyncResult.result());
                setContext(Vertx.currentContext());
                handler.handle(Future.succeededFuture(this));
            });
        } else {
            LOG.debug("already trying to connect to Hono server ...");
        }
        return this;
    }

    private void onRemoteClose(AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.failed()) {
            LOG.info("Hono server [{}:{}] closed connection with error condition: {}", new Object[]{this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), asyncResult.cause().getMessage()});
        }
        this.connection.close();
        onRemoteDisconnect(this.connection);
    }

    private void onRemoteDisconnect(ProtonConnection protonConnection) {
        LOG.warn("lost connection to Hono server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
        protonConnection.disconnect();
        this.activeSenders.clear();
        this.activeRegClients.clear();
        if (this.clientOptions.getReconnectAttempts() != 0) {
            this.vertx.setTimer(300L, l -> {
                LOG.info("attempting to re-connect to Hono server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                connect(this.clientOptions, asyncResult -> {
                });
            });
        }
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient getOrCreateTelemetrySender(String str, Handler<AsyncResult<MessageSender>> handler) {
        return getOrCreateTelemetrySender(str, null, handler);
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient getOrCreateTelemetrySender(String str, String str2, Handler<AsyncResult<MessageSender>> handler) {
        Objects.requireNonNull(str);
        getOrCreateSender(TelemetrySenderImpl.getTargetAddress(str, str2), handler2 -> {
            createTelemetrySender(str, str2, handler2);
        }, handler);
        return this;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient getOrCreateEventSender(String str, Handler<AsyncResult<MessageSender>> handler) {
        return getOrCreateEventSender(str, null, handler);
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient getOrCreateEventSender(String str, String str2, Handler<AsyncResult<MessageSender>> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(handler);
        getOrCreateSender(EventSenderImpl.getTargetAddress(str, str2), handler2 -> {
            createEventSender(str, str2, handler2);
        }, handler);
        return this;
    }

    void getOrCreateSender(String str, Consumer<Handler<AsyncResult<MessageSender>>> consumer, Handler<AsyncResult<MessageSender>> handler) {
        MessageSender messageSender = this.activeSenders.get(str);
        if (messageSender != null && messageSender.isOpen()) {
            LOG.debug("reusing existing message sender [target: {}, credit: {}]", str, Integer.valueOf(messageSender.getCredit()));
            handler.handle(Future.succeededFuture(messageSender));
        } else if (this.senderCreationLocks.computeIfAbsent(str, str2 -> {
            return Boolean.FALSE;
        }).booleanValue()) {
            LOG.debug("already trying to create a message sender for {}", str);
            handler.handle(Future.failedFuture("sender link not established yet"));
        } else {
            this.senderCreationLocks.put(str, Boolean.TRUE);
            LOG.debug("creating new message sender for {}", str);
            consumer.accept(asyncResult -> {
                if (asyncResult.succeeded()) {
                    MessageSender messageSender2 = (MessageSender) asyncResult.result();
                    LOG.debug("successfully created new message sender for {}", str);
                    this.activeSenders.put(str, messageSender2);
                } else {
                    LOG.debug("failed to create new message sender for {}", str, asyncResult.cause());
                    this.activeSenders.remove(str);
                }
                this.senderCreationLocks.remove(str);
                handler.handle(asyncResult);
            });
        }
    }

    private HonoClient createTelemetrySender(String str, String str2, Handler<AsyncResult<MessageSender>> handler) {
        Future future = Future.future();
        future.setHandler(handler);
        checkConnection().compose(obj -> {
            TelemetrySenderImpl.create(this.context, this.connection, str, str2, str3 -> {
                this.activeSenders.remove(TelemetrySenderImpl.getTargetAddress(str, str2));
            }, future.completer());
        }, future);
        return this;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient createTelemetryConsumer(String str, Consumer<Message> consumer, Handler<AsyncResult<MessageConsumer>> handler) {
        Future future = Future.future();
        future.setHandler(handler);
        checkConnection().compose(obj -> {
            TelemetryConsumerImpl.create(this.context, this.connection, str, this.connectionFactory.getPathSeparator(), consumer, future.completer());
        }, future);
        return this;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient createEventConsumer(String str, Consumer<Message> consumer, Handler<AsyncResult<MessageConsumer>> handler) {
        Future future = Future.future();
        future.setHandler(handler);
        checkConnection().compose(obj -> {
            EventConsumerImpl.create(this.context, this.connection, str, this.connectionFactory.getPathSeparator(), consumer, future.completer());
        }, future);
        return this;
    }

    private HonoClient createEventSender(String str, String str2, Handler<AsyncResult<MessageSender>> handler) {
        Future future = Future.future();
        future.setHandler(handler);
        checkConnection().compose(obj -> {
            EventSenderImpl.create(this.context, this.connection, str, str2, str3 -> {
                this.activeSenders.remove(EventSenderImpl.getTargetAddress(str, str2));
            }, future.completer());
        }, future);
        return this;
    }

    private <T> Future<T> checkConnection() {
        return (this.connection == null || this.connection.isDisconnected()) ? Future.failedFuture("client is not connected to Hono (yet)") : Future.succeededFuture();
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient getOrCreateRegistrationClient(String str, Handler<AsyncResult<RegistrationClient>> handler) {
        RegistrationClient registrationClient = this.activeRegClients.get(Objects.requireNonNull(str));
        if (registrationClient == null || !registrationClient.isOpen()) {
            createRegistrationClient(str, handler);
        } else {
            LOG.debug("reusing existing registration client for [{}]", str);
            handler.handle(Future.succeededFuture(registrationClient));
        }
        return this;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public HonoClient createRegistrationClient(String str, Handler<AsyncResult<RegistrationClient>> handler) {
        Objects.requireNonNull(str);
        if (this.connection == null || this.connection.isDisconnected()) {
            handler.handle(Future.failedFuture("client is not connected to Hono (yet)"));
        } else {
            LOG.debug("creating new registration client for [{}]", str);
            RegistrationClientImpl.create(this.context, this.connection, str, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    LOG.debug("failed to create registration client for [{}]", str, asyncResult.cause());
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                } else {
                    this.activeRegClients.put(str, asyncResult.result());
                    LOG.debug("successfully created registration client for [{}]", str);
                    handler.handle(Future.succeededFuture(asyncResult.result()));
                }
            });
        }
        return this;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public void shutdown() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        shutdown(asyncResult -> {
            if (asyncResult.succeeded()) {
                countDownLatch.countDown();
            } else {
                LOG.error("could not close connection to server", asyncResult.cause());
            }
        });
        try {
            if (!countDownLatch.await(5L, TimeUnit.SECONDS)) {
                LOG.error("shutdown of client timed out");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.eclipse.hono.client.HonoClient
    public void shutdown(Handler<AsyncResult<Void>> handler) {
        if (this.connection != null && !this.connection.isDisconnected()) {
            this.context.runOnContext(r7 -> {
                LOG.info("closing connection to server [{}:{}]...", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                this.connection.disconnectHandler((Handler) null);
                this.connection.closeHandler(asyncResult -> {
                    if (asyncResult.succeeded()) {
                        LOG.info("closed connection to server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                    } else {
                        LOG.info("could not close connection to server [{}:{}]", new Object[]{this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), asyncResult.cause()});
                    }
                    this.connection.disconnect();
                    if (handler != null) {
                        handler.handle(Future.succeededFuture());
                    }
                }).close();
            });
        } else {
            LOG.info("connection to server [{}:{}] already closed", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            handler.handle(Future.succeededFuture());
        }
    }
}
