package org.eclipse.hono.messaging;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.connection.ConnectionFactory;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/eclipse/hono/messaging/ForwardingDownstreamAdapter.class */
public abstract class ForwardingDownstreamAdapter implements DownstreamAdapter {
    private final Vertx vertx;
    private MessagingMetrics metrics;
    private ProtonConnection downstreamConnection;
    private SenderFactory senderFactory;
    private ConnectionFactory downstreamConnectionFactory;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    protected HonoMessagingConfigProperties honoConfig = new HonoMessagingConfigProperties();
    private final Map<UpstreamReceiver, ProtonSender> activeSenders = new HashMap();
    private final Map<String, List<UpstreamReceiver>> receiversPerConnection = new HashMap();
    private final List<Handler<AsyncResult<Void>>> clientAttachHandlers = new ArrayList();
    private boolean running = false;
    private boolean retryOnFailedConnectAttempt = true;

    /* JADX INFO: Access modifiers changed from: protected */
    public ForwardingDownstreamAdapter(Vertx vertx, SenderFactory senderFactory) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.senderFactory = (SenderFactory) Objects.requireNonNull(senderFactory);
    }

    @Autowired(required = false)
    public final void setHonoConfiguration(HonoMessagingConfigProperties honoMessagingConfigProperties) {
        if (this.running) {
            throw new IllegalStateException("configuration can not be set on running adapter");
        }
        this.honoConfig = (HonoMessagingConfigProperties) Objects.requireNonNull(honoMessagingConfigProperties);
    }

    @Autowired
    @Qualifier("downstream")
    public final void setDownstreamConnectionFactory(ConnectionFactory connectionFactory) {
        if (this.running) {
            throw new IllegalStateException("downstream container host can not be set on running adapter");
        }
        this.downstreamConnectionFactory = (ConnectionFactory) Objects.requireNonNull(connectionFactory);
    }

    @Autowired
    public final void setMetrics(MessagingMetrics messagingMetrics) {
        this.metrics = messagingMetrics;
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public final void start(Future<Void> future) {
        if (this.running) {
            future.complete();
            return;
        }
        if (this.downstreamConnectionFactory == null) {
            throw new IllegalStateException("downstream connection factory is not set");
        }
        this.running = true;
        if (this.honoConfig.isWaitForDownstreamConnectionEnabled()) {
            this.logger.info("waiting for connection to downstream container");
            connectToDownstream(createClientOptions(), asyncResult -> {
                if (asyncResult.succeeded()) {
                    future.complete();
                } else {
                    future.fail(asyncResult.cause());
                }
            });
        } else {
            connectToDownstream(createClientOptions());
            future.complete();
        }
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public final void stop(Future<Void> future) {
        if (this.running) {
            if (this.downstreamConnection == null || this.downstreamConnection.isDisconnected()) {
                this.logger.debug("downstream connection already closed");
            } else {
                this.logger.info("closing connection to downstream container [{}]", this.downstreamConnection.getRemoteContainer());
                this.downstreamConnection.closeHandler((Handler) null).disconnectHandler((Handler) null).close();
                this.metrics.decrementDownStreamConnections();
            }
            this.running = false;
        }
        future.complete();
    }

    protected final String getDownstreamContainer() {
        if (this.downstreamConnection != null) {
            return this.downstreamConnection.getRemoteContainer();
        }
        return null;
    }

    private ProtonClientOptions createClientOptions() {
        return new ProtonClientOptions().setConnectTimeout(200).setReconnectAttempts(1).setReconnectInterval(500L);
    }

    private void connectToDownstream(ProtonClientOptions protonClientOptions) {
        connectToDownstream(protonClientOptions, null);
    }

    private void connectToDownstream(ProtonClientOptions protonClientOptions, Handler<AsyncResult<ProtonConnection>> handler) {
        this.downstreamConnectionFactory.connect(protonClientOptions, this::onRemoteClose, this::onDisconnectFromDownstreamContainer, asyncResult -> {
            if (asyncResult.succeeded()) {
                this.downstreamConnection = (ProtonConnection) asyncResult.result();
                this.metrics.incrementDownStreamConnections();
                if (handler != null) {
                    handler.handle(Future.succeededFuture(asyncResult.result()));
                    return;
                }
                return;
            }
            this.logger.info("failed to connect to downstream container: {}", asyncResult.cause().getMessage());
            if (this.retryOnFailedConnectAttempt) {
                reconnect(handler);
            } else if (handler != null) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
    }

    private void onRemoteClose(AsyncResult<ProtonConnection> asyncResult) {
        if (!asyncResult.succeeded()) {
            this.logger.info("downstream container [{}] has closed connection: {}", this.downstreamConnection.getRemoteContainer(), asyncResult.cause().getMessage());
        } else {
            if (asyncResult.result() != this.downstreamConnection) {
                this.logger.warn("downstream container closed unknown connection");
                return;
            }
            this.logger.info("downstream container [{}] has closed connection", this.downstreamConnection.getRemoteContainer());
        }
        this.downstreamConnection.close();
        onDisconnectFromDownstreamContainer(this.downstreamConnection);
    }

    private void onDisconnectFromDownstreamContainer(ProtonConnection protonConnection) {
        if (protonConnection != this.downstreamConnection) {
            this.logger.warn("unknown connection to downstream container has been disconnected");
            return;
        }
        this.logger.warn("lost connection to downstream container [{}], closing upstream receivers ...", protonConnection.getRemoteContainer());
        Iterator<UpstreamReceiver> it = this.activeSenders.keySet().iterator();
        while (it.hasNext()) {
            closeReceiver(it.next());
        }
        this.receiversPerConnection.clear();
        this.activeSenders.clear();
        this.downstreamConnection.attachments().clear();
        this.downstreamConnection.disconnectHandler((Handler) null);
        this.downstreamConnection.disconnect();
        this.metrics.decrementDownStreamConnections();
        Iterator<Handler<AsyncResult<Void>>> it2 = this.clientAttachHandlers.iterator();
        while (it2.hasNext()) {
            it2.next().handle(Future.failedFuture("connection to downstream container failed"));
            it2.remove();
        }
        reconnect(null);
    }

    private void closeReceiver(UpstreamReceiver upstreamReceiver) {
        upstreamReceiver.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
        this.metrics.decrementUpstreamLinks(upstreamReceiver.getTargetAddress());
        this.metrics.decrementDownstreamSenders(upstreamReceiver.getTargetAddress());
        this.metrics.submitDownstreamLinkCredits(upstreamReceiver.getTargetAddress(), 0.0d);
    }

    private void reconnect(Handler<AsyncResult<ProtonConnection>> handler) {
        if (!this.running) {
            this.logger.info("adapter is stopped, will not re-connect to downstream container");
            return;
        }
        ProtonClientOptions createClientOptions = createClientOptions();
        if (createClientOptions.getReconnectAttempts() != 0) {
            this.vertx.setTimer(500L, l -> {
                this.logger.info("attempting to re-connect to downstream container");
                connectToDownstream(createClientOptions, handler);
            });
        }
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public final void onClientAttach(UpstreamReceiver upstreamReceiver, Handler<AsyncResult<Void>> handler) {
        if (!this.running) {
            throw new IllegalStateException("adapter must be started first");
        }
        Objects.requireNonNull(upstreamReceiver);
        Objects.requireNonNull(handler);
        ProtonSender protonSender = this.activeSenders.get(upstreamReceiver);
        if (protonSender != null && protonSender.isOpen()) {
            this.logger.info("reusing existing downstream sender [con: {}, link: {}]", upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId());
            handler.handle(Future.succeededFuture());
            return;
        }
        removeSender(upstreamReceiver);
        this.clientAttachHandlers.add(handler);
        Future future = Future.future();
        future.setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.logger.info("created downstream sender [con: {}, link: {}]", upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId());
            } else {
                this.logger.warn("can't create downstream sender [con: {}, link: {}]: {}", new Object[]{upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId(), asyncResult.cause().getMessage()});
            }
            this.clientAttachHandlers.remove(handler);
            handler.handle(asyncResult);
        });
        createSender(ResourceIdentifier.fromString(upstreamReceiver.getTargetAddress()), protonSender2 -> {
            handleFlow(protonSender2, upstreamReceiver);
        }, r5 -> {
            removeSender(upstreamReceiver);
            closeReceiver(upstreamReceiver);
        }).compose(protonSender3 -> {
            addSender(upstreamReceiver, protonSender3);
            future.complete();
        }, future);
    }

    public final void handleFlow(ProtonSender protonSender, UpstreamReceiver upstreamReceiver) {
        this.logger.trace("received FLOW from downstream container [con:{}, link: {}, sendQueueFull: {}, credits: {}, queued: {}, drain: {}", new Object[]{upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId(), Boolean.valueOf(protonSender.sendQueueFull()), Integer.valueOf(protonSender.getCredit()), Integer.valueOf(protonSender.getQueued()), Boolean.valueOf(protonSender.getDrain())});
        if (protonSender.getDrain()) {
            upstreamReceiver.drain(10000L, asyncResult -> {
                if (asyncResult.succeeded()) {
                    protonSender.drained();
                }
            });
            return;
        }
        int availableDownstreamCredit = getAvailableDownstreamCredit(protonSender);
        upstreamReceiver.replenish(availableDownstreamCredit);
        this.metrics.submitDownstreamLinkCredits(upstreamReceiver.getTargetAddress(), availableDownstreamCredit);
    }

    private static int getAvailableDownstreamCredit(ProtonSender protonSender) {
        return Math.max(0, protonSender.getCredit());
    }

    private Future<ProtonSender> createSender(ResourceIdentifier resourceIdentifier, Handler<ProtonSender> handler, Handler<Void> handler2) {
        return !isConnected() ? Future.failedFuture("downstream connection must be opened before creating sender") : this.senderFactory.createSender(this.downstreamConnection, resourceIdentifier, getDownstreamQos(), handler, handler2);
    }

    public final void addSender(UpstreamReceiver upstreamReceiver, ProtonSender protonSender) {
        protonSender.attachments().set("CONNECTION_ID", String.class, upstreamReceiver.getConnectionId());
        protonSender.setAutoDrained(false);
        this.activeSenders.put(upstreamReceiver, protonSender);
        List<UpstreamReceiver> list = this.receiversPerConnection.get(upstreamReceiver.getConnectionId());
        if (list == null) {
            list = new ArrayList();
            this.receiversPerConnection.put(upstreamReceiver.getConnectionId(), list);
        }
        list.add(upstreamReceiver);
        this.metrics.incrementDownstreamSenders(upstreamReceiver.getTargetAddress());
    }

    public final void removeSender(UpstreamReceiver upstreamReceiver) {
        List<UpstreamReceiver> list = this.receiversPerConnection.get(upstreamReceiver.getConnectionId());
        if (list != null) {
            list.remove(upstreamReceiver);
        }
        closeSender(upstreamReceiver);
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public final void onClientDetach(UpstreamReceiver upstreamReceiver) {
        if (!this.running) {
            throw new IllegalStateException("adapter must be started first");
        }
        Objects.requireNonNull(upstreamReceiver);
        removeSender(upstreamReceiver);
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public final void onClientDisconnect(String str) {
        if (!this.running) {
            throw new IllegalStateException("adapter must be started first");
        }
        List<UpstreamReceiver> remove = this.receiversPerConnection.remove(Objects.requireNonNull(str));
        if (remove == null || remove.isEmpty()) {
            return;
        }
        this.logger.info("closing {} downstream senders for connection [id: {}]", Integer.valueOf(remove.size()), str);
        for (UpstreamReceiver upstreamReceiver : remove) {
            closeSender(upstreamReceiver);
            this.metrics.decrementUpstreamLinks(upstreamReceiver.getTargetAddress());
        }
    }

    private void closeSender(UpstreamReceiver upstreamReceiver) {
        ProtonSender remove = this.activeSenders.remove(upstreamReceiver);
        if (remove == null || !remove.isOpen()) {
            return;
        }
        this.logger.info("closing downstream sender [con: {}, link: {}]", upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId());
        this.metrics.decrementDownstreamSenders(upstreamReceiver.getTargetAddress());
        this.metrics.submitDownstreamLinkCredits(upstreamReceiver.getTargetAddress(), 0.0d);
        remove.close();
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public final void processMessage(UpstreamReceiver upstreamReceiver, ProtonDelivery protonDelivery, Message message) {
        if (!this.running) {
            throw new IllegalStateException("adapter must be started first");
        }
        Objects.requireNonNull(upstreamReceiver);
        Objects.requireNonNull(message);
        Objects.requireNonNull(protonDelivery);
        ProtonSender protonSender = this.activeSenders.get(upstreamReceiver);
        if (protonSender == null) {
            this.logger.info("no downstream sender for link [{}] available, discarding message and closing link with client", upstreamReceiver.getLinkId());
            upstreamReceiver.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
            return;
        }
        if (!protonSender.isOpen()) {
            this.logger.warn("downstream sender for link [{}] is not open, discarding message and closing link with client", upstreamReceiver.getLinkId());
            upstreamReceiver.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
            onClientDetach(upstreamReceiver);
            this.metrics.incrementDiscardedMessages(protonSender.getTarget().getAddress());
            return;
        }
        if (!protonSender.sendQueueFull()) {
            this.logger.trace("forwarding message [id: {}, to: {}, content-type: {}] to downstream container [{}], credit available: {}, queued: {}", new Object[]{message.getMessageId(), message.getAddress(), message.getContentType(), getDownstreamContainer(), Integer.valueOf(protonSender.getCredit()), Integer.valueOf(protonSender.getQueued())});
            forwardMessage(protonSender, message, protonDelivery);
            this.metrics.incrementProcessedMessages(protonSender.getTarget().getAddress());
        } else if (protonDelivery.remotelySettled()) {
            this.logger.debug("no downstream credit available for link [{}], discarding message [{}]", upstreamReceiver.getLinkId(), message.getMessageId());
            ProtonHelper.accepted(protonDelivery, true);
            this.metrics.incrementDiscardedMessages(protonSender.getTarget().getAddress());
        } else {
            this.logger.debug("no downstream credit available for link [{}], releasing message [{}]", upstreamReceiver.getLinkId(), message.getMessageId());
            ProtonHelper.released(protonDelivery, true);
            this.metrics.incrementUndeliverableMessages(protonSender.getTarget().getAddress());
        }
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public final boolean isConnected() {
        return (this.downstreamConnection == null || this.downstreamConnection.isDisconnected()) ? false : true;
    }

    final void disableRetryOnFailedConnectAttempt() {
        this.retryOnFailedConnectAttempt = false;
    }

    protected final boolean isActiveSendersEmpty() {
        return this.activeSenders != null && this.activeSenders.isEmpty();
    }

    protected final boolean isSendersPerConnectionEmpty() {
        return this.receiversPerConnection != null && this.receiversPerConnection.isEmpty();
    }

    protected abstract void forwardMessage(ProtonSender protonSender, Message message, ProtonDelivery protonDelivery);

    protected abstract ProtonQoS getDownstreamQos();
}
