package org.eclipse.hono.server;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.config.HonoConfigProperties;
import org.eclipse.hono.connection.ConnectionFactory;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/eclipse/hono/server/ForwardingDownstreamAdapter.class */
public abstract class ForwardingDownstreamAdapter implements DownstreamAdapter {
    protected Logger logger = LoggerFactory.getLogger(getClass());
    protected HonoConfigProperties honoConfig = new HonoConfigProperties();
    private final Map<UpstreamReceiver, ProtonSender> activeSenders = new HashMap();
    private final Map<String, List<UpstreamReceiver>> sendersPerConnection = new HashMap();
    private boolean running = false;
    private final Vertx vertx;
    private ProtonConnection downstreamConnection;
    private SenderFactory senderFactory;
    private ConnectionFactory downstreamConnectionFactory;

    /* JADX INFO: Access modifiers changed from: protected */
    public ForwardingDownstreamAdapter(Vertx vertx, SenderFactory senderFactory) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.senderFactory = (SenderFactory) Objects.requireNonNull(senderFactory);
    }

    @Autowired(required = false)
    public final void setHonoConfiguration(HonoConfigProperties honoConfigProperties) {
        if (this.running) {
            throw new IllegalStateException("configuration can not be set on running adapter");
        }
        this.honoConfig = (HonoConfigProperties) Objects.requireNonNull(honoConfigProperties);
    }

    @Autowired
    public final void setDownstreamConnectionFactory(ConnectionFactory connectionFactory) {
        if (this.running) {
            throw new IllegalStateException("downstream container host can not be set on running adapter");
        }
        this.downstreamConnectionFactory = (ConnectionFactory) Objects.requireNonNull(connectionFactory);
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void start(Future<Void> future) {
        if (this.running) {
            future.complete();
            return;
        }
        if (this.downstreamConnectionFactory == null) {
            throw new IllegalStateException("downstream connection factory is not set");
        }
        this.running = true;
        if (this.honoConfig.isWaitForDownstreamConnectionEnabled()) {
            this.logger.info("waiting for connection to downstream container");
            connectToDownstream(createClientOptions(), asyncResult -> {
                if (asyncResult.succeeded()) {
                    future.complete();
                } else {
                    future.fail(asyncResult.cause());
                }
            });
        } else {
            connectToDownstream(createClientOptions());
            future.complete();
        }
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void stop(Future<Void> future) {
        if (this.running) {
            if (this.downstreamConnection == null || this.downstreamConnection.isDisconnected()) {
                this.logger.debug("downstream connection already closed");
            } else {
                this.logger.info("closing connection to downstream container [{}]", this.downstreamConnection.getRemoteContainer());
                this.downstreamConnection.closeHandler((Handler) null).disconnectHandler((Handler) null).close();
            }
            this.running = false;
        }
        future.complete();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getDownstreamContainer() {
        if (this.downstreamConnection != null) {
            return this.downstreamConnection.getRemoteContainer();
        }
        return null;
    }

    private ProtonClientOptions createClientOptions() {
        return new ProtonClientOptions().setConnectTimeout(100).setReconnectAttempts(-1).setReconnectInterval(200L);
    }

    private void connectToDownstream(ProtonClientOptions protonClientOptions) {
        connectToDownstream(protonClientOptions, asyncResult -> {
        });
    }

    private void connectToDownstream(ProtonClientOptions protonClientOptions, Handler<AsyncResult<ProtonConnection>> handler) {
        this.downstreamConnectionFactory.connect(protonClientOptions, this::onRemoteClose, this::onDisconnectFromDownstreamContainer, asyncResult -> {
            if (asyncResult.succeeded()) {
                this.downstreamConnection = (ProtonConnection) asyncResult.result();
                handler.handle(Future.succeededFuture(asyncResult.result()));
            } else {
                this.logger.info("failed to connect to downstream container", asyncResult.cause());
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
    }

    private void onRemoteClose(AsyncResult<ProtonConnection> asyncResult) {
        this.logger.info("connection to downstream container [{}] is closed", this.downstreamConnection.getRemoteContainer());
        this.downstreamConnection.close();
    }

    private void onDisconnectFromDownstreamContainer(ProtonConnection protonConnection) {
        this.logger.warn("lost connection to downstream container [{}], closing upstream receivers ...", protonConnection.getRemoteContainer());
        Iterator<UpstreamReceiver> it = this.activeSenders.keySet().iterator();
        while (it.hasNext()) {
            it.next().close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
        }
        this.activeSenders.clear();
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.disconnect();
        ProtonClientOptions createClientOptions = createClientOptions();
        if (createClientOptions.getReconnectAttempts() != 0) {
            this.vertx.setTimer(300L, l -> {
                this.logger.info("attempting to re-connect to downstream container");
                connectToDownstream(createClientOptions);
            });
        }
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void onClientAttach(UpstreamReceiver upstreamReceiver, Handler<AsyncResult<Void>> handler) {
        if (!this.running) {
            throw new IllegalStateException("adapter must be started first");
        }
        Objects.requireNonNull(upstreamReceiver);
        Objects.requireNonNull(handler);
        if (!this.activeSenders.containsKey(upstreamReceiver.getLinkId())) {
            createSender(upstreamReceiver.getTargetAddress(), protonSender -> {
                handleFlow(protonSender, upstreamReceiver);
            }, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    this.logger.warn("can't create downstream sender [con: {}, link: {}]", new Object[]{upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId(), asyncResult.cause()});
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                } else {
                    this.logger.info("created downstream sender [con: {}, link: {}]", upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId());
                    addSender(upstreamReceiver, (ProtonSender) asyncResult.result());
                    handler.handle(Future.succeededFuture());
                }
            });
        } else {
            this.logger.info("reusing existing downstream sender [con: {}, link: {}]", upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId());
            handler.handle(Future.succeededFuture());
        }
    }

    public final void handleFlow(ProtonSender protonSender, UpstreamReceiver upstreamReceiver) {
        int availableCredit = getAvailableCredit(protonSender);
        this.logger.trace("received FLOW from downstream sender for upstream client [con:{}, link: {}, credits: {}, drain: {}", new Object[]{upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId(), Integer.valueOf(availableCredit), Boolean.valueOf(protonSender.getDrain())});
        if (protonSender.getDrain()) {
            upstreamReceiver.drain(10000L, asyncResult -> {
                if (asyncResult.succeeded()) {
                    protonSender.drained();
                }
            });
        } else {
            upstreamReceiver.replenish(availableCredit);
        }
    }

    private void createSender(String str, Handler<ProtonSender> handler, Handler<AsyncResult<ProtonSender>> handler2) {
        Future<ProtonSender> future = Future.future();
        future.setHandler(handler2);
        if (this.downstreamConnection == null || this.downstreamConnection.isDisconnected()) {
            future.fail("downstream connection must be opened before creating sender");
        } else {
            this.senderFactory.createSender(this.downstreamConnection, getTenantOnlyTargetAddress(str).replace("/", this.honoConfig.getPathSeparator()), getDownstreamQos(), handler, future);
        }
    }

    private static String getTenantOnlyTargetAddress(String str) {
        ResourceIdentifier fromString = ResourceIdentifier.fromString(str);
        return String.format("%s/%s", fromString.getEndpoint(), fromString.getTenantId());
    }

    public final void addSender(UpstreamReceiver upstreamReceiver, ProtonSender protonSender) {
        protonSender.attachments().set("CONNECTION_ID", String.class, upstreamReceiver.getConnectionId());
        protonSender.setAutoDrained(false);
        this.activeSenders.put(upstreamReceiver, protonSender);
        List<UpstreamReceiver> list = this.sendersPerConnection.get(upstreamReceiver.getConnectionId());
        if (list == null) {
            list = new ArrayList();
            this.sendersPerConnection.put(upstreamReceiver.getConnectionId(), list);
        }
        list.add(upstreamReceiver);
    }

    private static int getAvailableCredit(ProtonSender protonSender) {
        return protonSender.getCredit() - protonSender.getQueued();
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void onClientDetach(UpstreamReceiver upstreamReceiver) {
        if (!this.running) {
            throw new IllegalStateException("adapter must be started first");
        }
        Objects.requireNonNull(upstreamReceiver);
        closeSender(upstreamReceiver);
        List<UpstreamReceiver> list = this.sendersPerConnection.get(upstreamReceiver.getConnectionId());
        if (list != null) {
            list.remove(upstreamReceiver);
        }
    }

    private void closeSender(UpstreamReceiver upstreamReceiver) {
        ProtonSender remove = this.activeSenders.remove(upstreamReceiver);
        if (remove == null || !remove.isOpen()) {
            return;
        }
        this.logger.info("closing downstream sender [con: {}, link: {}]", upstreamReceiver.getConnectionId(), upstreamReceiver.getLinkId());
        remove.close();
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void onClientDisconnect(String str) {
        if (!this.running) {
            throw new IllegalStateException("adapter must be started first");
        }
        List<UpstreamReceiver> remove = this.sendersPerConnection.remove(Objects.requireNonNull(str));
        if (remove == null || remove.isEmpty()) {
            return;
        }
        this.logger.info("closing {} downstream senders for connection [id: {}]", Integer.valueOf(remove.size()), str);
        Iterator<UpstreamReceiver> it = remove.iterator();
        while (it.hasNext()) {
            closeSender(it.next());
        }
    }

    @Override // org.eclipse.hono.server.DownstreamAdapter
    public final void processMessage(UpstreamReceiver upstreamReceiver, ProtonDelivery protonDelivery, Message message) {
        if (!this.running) {
            throw new IllegalStateException("adapter must be started first");
        }
        Objects.requireNonNull(upstreamReceiver);
        Objects.requireNonNull(message);
        Objects.requireNonNull(protonDelivery);
        ProtonSender protonSender = this.activeSenders.get(upstreamReceiver);
        if (protonSender == null) {
            this.logger.info("no downstream sender for link [{}] available, discarding message and closing link with client", upstreamReceiver.getLinkId());
            upstreamReceiver.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
        } else {
            if (protonSender.isOpen()) {
                forwardMessage(protonSender, message, protonDelivery);
                return;
            }
            this.logger.warn("downstream sender for link [{}] is not open, discarding message and closing link with client", upstreamReceiver.getLinkId());
            upstreamReceiver.close(ErrorConditions.ERROR_NO_DOWNSTREAM_CONSUMER);
            onClientDetach(upstreamReceiver);
        }
    }

    protected abstract void forwardMessage(ProtonSender protonSender, Message message, ProtonDelivery protonDelivery);

    protected abstract ProtonQoS getDownstreamQos();
}
