package org.eclipse.hono.server;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.config.HonoConfigProperties;
import org.eclipse.hono.util.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/eclipse/hono/server/ForwardingDownstreamAdapter.class */
public abstract class ForwardingDownstreamAdapter implements DownstreamAdapter {
    protected String downstreamContainerHost;
    protected int downstreamContainerPort;
    private final Vertx vertx;
    private ProtonConnection downstreamConnection;
    private SenderFactory senderFactory;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    protected HonoConfigProperties honoConfig = new HonoConfigProperties();
    private final Map<String, ProtonSender> activeSenders = new HashMap();
    private final Map<String, List<String>> sendersPerConnection = new HashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    public ForwardingDownstreamAdapter(Vertx vertx, SenderFactory senderFactory) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.senderFactory = (SenderFactory) Objects.requireNonNull(senderFactory);
    }

    @Autowired(required = false)
    public void setHonoConfiguration(HonoConfigProperties honoConfigProperties) {
        this.honoConfig = (HonoConfigProperties) Objects.requireNonNull(honoConfigProperties);
    }

    @Value("${hono.telemetry.downstream.host:localhost}")
    public final void setDownstreamContainerHost(String str) {
        this.downstreamContainerHost = (String) Objects.requireNonNull(str);
    }

    @Value("${hono.telemetry.downstream.port:15672}")
    public final void setDownstreamContainerPort(int i) {
        if (i < 1 || i >= 65536) {
            throw new IllegalArgumentException("illegal port number");
        }
        this.downstreamContainerPort = i;
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void start(Future<Void> future) {
        if (this.downstreamContainerHost == null) {
            throw new IllegalStateException("downstream container host is not set");
        }
        if (this.downstreamContainerPort == 0) {
            throw new IllegalStateException("downstream container port is not set");
        }
        if (this.honoConfig.isWaitForDownstreamConnectionEnabled()) {
            this.logger.info("waiting for connection to downstream container");
            connectToDownstream(createClientOptions(), future);
        } else {
            connectToDownstream(createClientOptions());
            future.complete();
        }
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void stop(Future<Void> future) {
        if (this.downstreamConnection == null || this.downstreamConnection.isDisconnected()) {
            this.logger.debug("downstream connection already closed");
        } else {
            this.logger.info("closing connection to downstream container [{}]", this.downstreamConnection.getRemoteContainer());
            this.downstreamConnection.closeHandler((Handler) null).disconnectHandler((Handler) null).close();
        }
        future.complete();
    }

    protected ProtonClientOptions createClientOptions() {
        return new ProtonClientOptions().setConnectTimeout(100).setReconnectAttempts(-1).setReconnectInterval(200L);
    }

    private void connectToDownstream(ProtonClientOptions protonClientOptions) {
        connectToDownstream(protonClientOptions, Future.future());
    }

    private void connectToDownstream(ProtonClientOptions protonClientOptions, Future<Void> future) {
        this.logger.info("connecting to downstream container [{}:{}]...", this.downstreamContainerHost, Integer.valueOf(this.downstreamContainerPort));
        ProtonClient.create(this.vertx).connect(protonClientOptions, this.downstreamContainerHost, this.downstreamContainerPort, asyncResult -> {
            if (asyncResult.failed()) {
                this.logger.warn("can't connect to downstream AMQP 1.0 container [{}:{}]: {}", new Object[]{this.downstreamContainerHost, Integer.valueOf(this.downstreamContainerPort), asyncResult.cause().getMessage()});
            } else {
                this.logger.info("connected to downstream AMQP 1.0 container [{}:{}], opening connection ...", this.downstreamContainerHost, Integer.valueOf(this.downstreamContainerPort));
                ((ProtonConnection) asyncResult.result()).setContainer("Hono-TelemetryAdapter" + UUID.randomUUID()).setHostname("hono-internal").openHandler(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        this.logger.warn("can't open connection to downstream container [{}]", this.downstreamConnection.getRemoteContainer(), asyncResult.cause());
                        future.fail(asyncResult.cause());
                        return;
                    }
                    this.downstreamConnection = (ProtonConnection) asyncResult.result();
                    this.logger.info("connection to downstream container [{}] open", this.downstreamConnection.getRemoteContainer());
                    this.downstreamConnection.disconnectHandler(this::onDisconnectFromDownstreamContainer);
                    this.downstreamConnection.closeHandler(asyncResult -> {
                        this.logger.info("connection to downstream container [{}] is closed", this.downstreamConnection.getRemoteContainer());
                        this.downstreamConnection.close();
                    });
                    future.complete();
                }).open();
            }
        });
    }

    private void onDisconnectFromDownstreamContainer(ProtonConnection protonConnection) {
        this.logger.warn("lost connection to downstream container [{}]", this.downstreamContainerHost);
        this.activeSenders.clear();
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.disconnect();
        ProtonClientOptions createClientOptions = createClientOptions();
        if (createClientOptions.getReconnectAttempts() != 0) {
            this.vertx.setTimer(300L, l -> {
                this.logger.info("attempting to re-connect to downstream container [{}]", this.downstreamContainerHost);
                connectToDownstream(createClientOptions);
            });
        }
    }

    public final void setDownstreamConnection(ProtonConnection protonConnection) {
        this.downstreamConnection = protonConnection;
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void onClientAttach(UpstreamReceiver upstreamReceiver, Handler<AsyncResult<Void>> handler) {
        if (this.activeSenders.containsKey(upstreamReceiver.getLinkId())) {
            this.logger.info("reusing existing downstream sender [con: {}, link: {}]", upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId());
        } else {
            createSender(upstreamReceiver.getTargetAddress(), protonSender -> {
                handleFlow(protonSender, upstreamReceiver);
            }, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                    this.logger.warn("can't create downstream sender [con: {}, link: {}]", new Object[]{upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId(), asyncResult.cause()});
                } else {
                    this.logger.info("created downstream sender [con: {}, link: {}]", upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId());
                    addSender(upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId(), (ProtonSender) asyncResult.result());
                    handler.handle(Future.succeededFuture());
                }
            });
        }
    }

    public final void handleFlow(ProtonSender protonSender, UpstreamReceiver upstreamReceiver) {
        int availableCredit = getAvailableCredit(protonSender);
        this.logger.trace("received FLOW from downstream sender for upstream client [con:{}, link: {}, credits: {}, drain: {}", new Object[]{upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId(), Integer.valueOf(availableCredit), Boolean.valueOf(protonSender.getDrain())});
        if (protonSender.getDrain()) {
            upstreamReceiver.drain(10000L, asyncResult -> {
                if (asyncResult.succeeded()) {
                    protonSender.drained();
                }
            });
        } else {
            upstreamReceiver.replenish(availableCredit);
        }
    }

    private void createSender(String str, Handler<ProtonSender> handler, Handler<AsyncResult<ProtonSender>> handler2) {
        Future<ProtonSender> future = Future.future();
        future.setHandler(handler2);
        if (this.downstreamConnection == null || this.downstreamConnection.isDisconnected()) {
            future.fail("downstream connection must be opened before creating sender");
        } else {
            this.senderFactory.createSender(this.downstreamConnection, str.replace("/", this.honoConfig.getPathSeparator()), getDownstreamQos(), handler, future);
        }
    }

    public final void addSender(String str, String str2, ProtonSender protonSender) {
        protonSender.attachments().set("CONNECTION_ID", String.class, str);
        protonSender.setAutoDrained(false);
        this.activeSenders.put(str2, protonSender);
        List<String> list = this.sendersPerConnection.get(str);
        if (list == null) {
            list = new ArrayList();
            this.sendersPerConnection.put(str, list);
        }
        list.add(str2);
    }

    private static int getAvailableCredit(ProtonSender protonSender) {
        return protonSender.getCredit() - protonSender.getQueued();
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void onClientDetach(UpstreamReceiver upstreamReceiver) {
        List<String> list;
        String closeSender = closeSender(upstreamReceiver.getLinkId());
        if (closeSender == null || (list = this.sendersPerConnection.get(closeSender)) == null) {
            return;
        }
        list.remove(upstreamReceiver.getLinkId());
    }

    private String closeSender(String str) {
        ProtonSender remove = this.activeSenders.remove(str);
        if (remove == null || !remove.isOpen()) {
            return null;
        }
        String connectionId = Constants.getConnectionId(remove);
        this.logger.info("closing downstream sender [con: {}, link: {}]", connectionId, str);
        remove.close();
        return connectionId;
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void onClientDisconnect(ProtonConnection protonConnection) {
        String str = (String) protonConnection.attachments().get("CONNECTION_ID", String.class);
        List<String> remove = this.sendersPerConnection.remove(Objects.requireNonNull(str));
        if (remove == null || remove.isEmpty()) {
            return;
        }
        this.logger.info("closing {} downstream senders for connection [id: {}]", Integer.valueOf(remove.size()), str);
        Iterator<String> it = remove.iterator();
        while (it.hasNext()) {
            closeSender(it.next());
        }
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void processMessage(UpstreamReceiver upstreamReceiver, ProtonDelivery protonDelivery, Message message) {
        Objects.requireNonNull(upstreamReceiver);
        Objects.requireNonNull(message);
        Objects.requireNonNull(protonDelivery);
        ProtonSender protonSender = this.activeSenders.get(upstreamReceiver.getLinkId());
        if (protonSender == null) {
            this.logger.info("no downstream sender for link [{}] available, discarding message and closing link with client", upstreamReceiver.getLinkId());
            upstreamReceiver.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
        } else {
            if (protonSender.isOpen()) {
                forwardMessage(protonSender, message, protonDelivery);
                return;
            }
            this.logger.warn("downstream sender for link [{}] is not open, discarding message and closing link with client", upstreamReceiver.getLinkId());
            upstreamReceiver.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
            onClientDetach(upstreamReceiver);
        }
    }

    protected abstract void forwardMessage(ProtonSender protonSender, Message message, ProtonDelivery protonDelivery);

    protected abstract ProtonQoS getDownstreamQos();
}
