package org.eclipse.hono.telemetry.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonSender;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.telemetry.SenderFactory;
import org.eclipse.hono.telemetry.TelemetryConstants;
import org.eclipse.hono.util.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/telemetry/impl/ForwardingTelemetryAdapter.class */
public final class ForwardingTelemetryAdapter extends BaseTelemetryAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(ForwardingTelemetryAdapter.class);
    private final Map<String, ProtonSender> activeSenders;
    private final Map<String, List<String>> sendersPerConnection;
    private ProtonConnection downstreamConnection;
    private String downstreamContainerHost;
    private int downstreamContainerPort;
    private String pathSeparator;
    private SenderFactory senderFactory;

    public ForwardingTelemetryAdapter(SenderFactory senderFactory) {
        this(senderFactory, 0, 1);
    }

    public ForwardingTelemetryAdapter(SenderFactory senderFactory, int i, int i2) {
        super(i, i2);
        this.activeSenders = new HashMap();
        this.sendersPerConnection = new HashMap();
        this.pathSeparator = TelemetryConstants.PATH_SEPARATOR;
        this.senderFactory = (SenderFactory) Objects.requireNonNull(senderFactory);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setDownstreamContainerHost(String str) {
        this.downstreamContainerHost = (String) Objects.requireNonNull(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setDownstreamContainerPort(int i) {
        if (i < 1 || i >= 65536) {
            throw new IllegalArgumentException("illegal port number");
        }
        this.downstreamContainerPort = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setPathSeparator(String str) {
        this.pathSeparator = (String) Objects.requireNonNull(str);
    }

    @Override // org.eclipse.hono.telemetry.impl.BaseTelemetryAdapter
    public void doStart(Future<Void> future) throws Exception {
        if (this.downstreamContainerHost == null) {
            throw new IllegalStateException("downstream container host is not set");
        }
        if (this.downstreamContainerPort == 0) {
            throw new IllegalStateException("downstream container port is not set");
        }
        connectToDownstream(createClientOptions(), future.completer());
    }

    @Override // org.eclipse.hono.telemetry.impl.BaseTelemetryAdapter
    protected void doStop(Future<Void> future) {
        if (this.downstreamConnection == null || this.downstreamConnection.isDisconnected()) {
            LOG.debug("downstream connection already closed");
        } else {
            LOG.info("closing connection to downstream container [{}]", this.downstreamConnection.getRemoteContainer());
            this.downstreamConnection.closeHandler((Handler) null).disconnectHandler((Handler) null).close();
        }
        future.complete();
    }

    private ProtonClientOptions createClientOptions() {
        ProtonClientOptions protonClientOptions = new ProtonClientOptions();
        protonClientOptions.setReconnectAttempts(-1).setReconnectInterval(200L);
        return protonClientOptions;
    }

    private void connectToDownstream(ProtonClientOptions protonClientOptions, Handler<AsyncResult<Void>> handler) {
        LOG.info("connecting to downstream container [{}:{}]...", this.downstreamContainerHost, Integer.valueOf(this.downstreamContainerPort));
        ProtonClient.create(this.vertx).connect(protonClientOptions, this.downstreamContainerHost, this.downstreamContainerPort, asyncResult -> {
            if (asyncResult.failed()) {
                LOG.warn("can't connect to downstream AMQP 1.0 container [{}:{}]: {}", new Object[]{this.downstreamContainerHost, Integer.valueOf(this.downstreamContainerPort), asyncResult.cause().getMessage()});
                handler.handle(Future.failedFuture(asyncResult.cause()));
            } else {
                LOG.info("connected to downstream AMQP 1.0 container [{}:{}], opening connection ...", this.downstreamContainerHost, Integer.valueOf(this.downstreamContainerPort));
                ((ProtonConnection) asyncResult.result()).setContainer("Hono-TelemetryAdapter-" + this.instanceNo).setHostname("hono-internal").openHandler(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        LOG.warn("can't open connection to downstream container [{}]", this.downstreamConnection.getRemoteContainer(), asyncResult.cause());
                        handler.handle(Future.failedFuture(asyncResult.cause()));
                        return;
                    }
                    this.downstreamConnection = (ProtonConnection) asyncResult.result();
                    LOG.info("connection to downstream container [{}] open", this.downstreamConnection.getRemoteContainer());
                    this.downstreamConnection.disconnectHandler(this::onDisconnectFromDownstreamContainer);
                    this.downstreamConnection.closeHandler(asyncResult -> {
                        LOG.info("connection to downstream container [{}] is closed", this.downstreamConnection.getRemoteContainer());
                        this.downstreamConnection.close();
                    });
                    handler.handle(Future.succeededFuture());
                }).open();
            }
        });
    }

    private void onDisconnectFromDownstreamContainer(ProtonConnection protonConnection) {
        LOG.warn("lost connection to downstream container [{}]", this.downstreamContainerHost);
        this.activeSenders.clear();
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.disconnect();
        ProtonClientOptions createClientOptions = createClientOptions();
        if (createClientOptions.getReconnectAttempts() != 0) {
            this.vertx.setTimer(300L, l -> {
                LOG.info("attempting to re-connect to downstream container [{}]", this.downstreamContainerHost);
                connectToDownstream(createClientOptions, asyncResult -> {
                });
            });
        }
    }

    void setDownstreamConnection(ProtonConnection protonConnection) {
        this.downstreamConnection = protonConnection;
    }

    @Override // org.eclipse.hono.telemetry.impl.BaseTelemetryAdapter
    protected void onLinkAttached(String str, String str2, String str3) {
        if (this.activeSenders.containsKey(str2)) {
            LOG.info("reusing existing downstream sender [con: {}, link: {}]", str, str2);
        } else {
            createSender(str3, protonSender -> {
                int availableCredit = getAvailableCredit(protonSender);
                LOG.trace("downstream sender [con:{}, link: {}] has been replenished with {} credits", new Object[]{str, str2, Integer.valueOf(availableCredit)});
                replenishUpstreamSender(str2, availableCredit);
            }, asyncResult -> {
                if (asyncResult.succeeded()) {
                    LOG.info("created downstream sender [con: {}, link: {}]", str, str2);
                    addSender(str, str2, (ProtonSender) asyncResult.result());
                } else {
                    LOG.warn("can't create downstream sender [con: {}, link: {}]", new Object[]{str, str2, asyncResult.cause()});
                    sendErrorMessage(str2, true);
                }
            });
        }
    }

    private void createSender(String str, Handler<ProtonSender> handler, Handler<AsyncResult<ProtonSender>> handler2) {
        Future<ProtonSender> future = Future.future();
        future.setHandler(handler2);
        if (this.downstreamConnection == null || this.downstreamConnection.isDisconnected()) {
            future.fail("downstream connection must be opened before creating sender");
        } else {
            this.senderFactory.createSender(this.downstreamConnection, str.replace(TelemetryConstants.PATH_SEPARATOR, this.pathSeparator), handler, future);
        }
    }

    void addSender(String str, String str2, ProtonSender protonSender) {
        protonSender.attachments().set("CONNECTION_ID", String.class, str);
        this.activeSenders.put(str2, protonSender);
        List<String> list = this.sendersPerConnection.get(str);
        if (list == null) {
            list = new ArrayList();
            this.sendersPerConnection.put(str, list);
        }
        list.add(str2);
    }

    private static int getAvailableCredit(ProtonSender protonSender) {
        return protonSender.getCredit() - protonSender.getQueued();
    }

    @Override // org.eclipse.hono.telemetry.impl.BaseTelemetryAdapter
    protected void onLinkDetached(String str) {
        List<String> list;
        String closeSender = closeSender(str);
        if (closeSender == null || (list = this.sendersPerConnection.get(closeSender)) == null) {
            return;
        }
        list.remove(str);
    }

    private String closeSender(String str) {
        ProtonSender remove = this.activeSenders.remove(str);
        if (remove == null || !remove.isOpen()) {
            return null;
        }
        String connectionId = Constants.getConnectionId(remove);
        LOG.info("closing downstream sender [con: {}, link: {}]", connectionId, str);
        remove.close();
        return connectionId;
    }

    @Override // org.eclipse.hono.telemetry.impl.BaseTelemetryAdapter
    protected void onConnectionClosed(String str) {
        List<String> remove = this.sendersPerConnection.remove(Objects.requireNonNull(str));
        if (remove == null || remove.isEmpty()) {
            return;
        }
        LOG.info("closing {} downstream senders for connection [id: {}]", Integer.valueOf(remove.size()), str);
        Iterator<String> it = remove.iterator();
        while (it.hasNext()) {
            closeSender(it.next());
        }
    }

    @Override // org.eclipse.hono.telemetry.TelemetryAdapter
    public void processTelemetryData(Message message, String str) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(str);
        ProtonSender protonSender = this.activeSenders.get(str);
        if (protonSender == null) {
            LOG.info("no downstream sender for link [{}] available, discarding message and closing link with client", str);
            sendErrorMessage(str, true);
        } else {
            if (protonSender.isOpen()) {
                forwardMessage(protonSender, message);
                return;
            }
            LOG.warn("downstream sender for link [{}] is not open, discarding message and closing link with client", str);
            sendErrorMessage(str, true);
            onLinkDetached(str);
        }
    }

    private void forwardMessage(ProtonSender protonSender, Message message) {
        LOG.debug("forwarding message [id: {}, to: {}, content-type: {}] to downstream container [{}:{}]", new Object[]{message.getMessageId(), message.getAddress(), message.getContentType(), this.downstreamContainerHost, Integer.valueOf(this.downstreamContainerPort)});
        protonSender.send(message);
    }
}
