package org.eclipse.hono.messaging;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonSession;
import java.util.Objects;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/eclipse/hono/messaging/SenderFactoryImpl.class */
public class SenderFactoryImpl implements SenderFactory {
    private static final Logger LOG = LoggerFactory.getLogger(SenderFactoryImpl.class);
    private HonoMessagingConfigProperties config;
    private ClientConfigProperties downstreamProperties;

    @Autowired(required = false)
    public void setConfiguration(HonoMessagingConfigProperties honoMessagingConfigProperties) {
        this.config = (HonoMessagingConfigProperties) Objects.requireNonNull(honoMessagingConfigProperties);
    }

    @Autowired
    @Qualifier("downstream")
    public void setDownstreamProperties(ClientConfigProperties clientConfigProperties) {
        this.downstreamProperties = (ClientConfigProperties) Objects.requireNonNull(clientConfigProperties);
    }

    @Override // org.eclipse.hono.messaging.SenderFactory
    public Future<ProtonSender> createSender(ProtonConnection protonConnection, ResourceIdentifier resourceIdentifier, ProtonQoS protonQoS, Handler<ProtonSender> handler, Handler<Void> handler2) {
        Objects.requireNonNull(protonConnection);
        Objects.requireNonNull(resourceIdentifier);
        Objects.requireNonNull(protonQoS);
        return protonConnection.isDisconnected() ? Future.failedFuture("connection is disconnected") : newSession(protonConnection, resourceIdentifier).compose(protonSession -> {
            return newSender(protonConnection, protonSession, resourceIdentifier, protonQoS, handler, str -> {
                handler2.handle((Object) null);
            });
        });
    }

    Future<ProtonSession> newSession(ProtonConnection protonConnection, ResourceIdentifier resourceIdentifier) {
        Future<ProtonSession> future = Future.future();
        ProtonSession protonSession = (ProtonSession) protonConnection.attachments().get(resourceIdentifier.getEndpoint(), ProtonSession.class);
        if (protonSession != null) {
            LOG.debug("re-using existing session for sending {} data downstream", resourceIdentifier.getEndpoint());
            future.complete(protonSession);
        } else {
            LOG.debug("creating new session for sending {} data downstream", resourceIdentifier.getEndpoint());
            ProtonSession createSession = protonConnection.createSession();
            createSession.openHandler(asyncResult -> {
                if (!asyncResult.succeeded()) {
                    future.fail(asyncResult.cause());
                } else {
                    protonConnection.attachments().set(resourceIdentifier.getEndpoint(), ProtonSession.class, (ProtonSession) asyncResult.result());
                    future.complete((ProtonSession) asyncResult.result());
                }
            });
            createSession.open();
        }
        return future;
    }

    Future<ProtonSender> newSender(ProtonConnection protonConnection, ProtonSession protonSession, ResourceIdentifier resourceIdentifier, ProtonQoS protonQoS, Handler<ProtonSender> handler, Handler<String> handler2) {
        Future<ProtonSender> future = Future.future();
        ProtonSender createSender = protonSession.createSender(getTenantOnlyTargetAddress(resourceIdentifier));
        createSender.setQoS(protonQoS);
        createSender.setAutoSettle(true);
        createSender.sendQueueDrainHandler(handler);
        HonoProtonHelper.setCloseHandler(createSender, asyncResult -> {
            onRemoteDetach(createSender, resourceIdentifier, protonConnection.getRemoteContainer(), true, handler2);
        });
        HonoProtonHelper.setDetachHandler(createSender, asyncResult2 -> {
            onRemoteDetach(createSender, resourceIdentifier, protonConnection.getRemoteContainer(), false, handler2);
        });
        createSender.openHandler(asyncResult3 -> {
            if (asyncResult3.succeeded()) {
                LOG.debug("sender [{}] for container [{}] open", resourceIdentifier, protonConnection.getRemoteContainer());
                future.complete((ProtonSender) asyncResult3.result());
            } else {
                LOG.debug("could not open sender [{}] for container [{}]", new Object[]{resourceIdentifier, protonConnection.getRemoteContainer(), asyncResult3.cause()});
                future.fail(asyncResult3.cause());
            }
        });
        createSender.open();
        Context currentContext = Vertx.currentContext();
        if (currentContext != null) {
            currentContext.owner().setTimer(this.downstreamProperties.getLinkEstablishmentTimeout(), l -> {
                onTimeOut(createSender, this.downstreamProperties.getLinkEstablishmentTimeout(), future);
            });
        }
        return future;
    }

    private String getTenantOnlyTargetAddress(ResourceIdentifier resourceIdentifier) {
        return String.format("%s%s%s", resourceIdentifier.getEndpoint(), this.config == null ? "/" : this.config.getPathSeparator(), resourceIdentifier.getTenantId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void onRemoteDetach(ProtonLink<?> protonLink, ResourceIdentifier resourceIdentifier, String str, boolean z, Handler<String> handler) {
        ErrorCondition remoteCondition = protonLink.getRemoteCondition();
        String str2 = protonLink instanceof ProtonSender ? "sender" : "receiver";
        String address = protonLink instanceof ProtonSender ? protonLink.getTarget().getAddress() : protonLink.getSource().getAddress();
        if (remoteCondition == null) {
            LOG.debug("{} [{}] detached (with closed={}) by peer [{}]", new Object[]{str2, address, Boolean.valueOf(z), str});
        } else {
            LOG.debug("{} [{}] detached (with closed={}) by peer [{}]: {} - {}", new Object[]{str2, address, Boolean.valueOf(z), str, remoteCondition.getCondition(), remoteCondition.getDescription()});
        }
        protonLink.close();
        if (!HonoProtonHelper.isLinkEstablished(protonLink) || handler == null) {
            return;
        }
        handler.handle(resourceIdentifier.getResourceId());
    }

    private static void onTimeOut(ProtonLink<?> protonLink, long j, Future<?> future) {
        if (!protonLink.isOpen() || HonoProtonHelper.isLinkEstablished(protonLink)) {
            return;
        }
        LOG.debug("sender link establishment timed out after {}ms", Long.valueOf(j));
        protonLink.close();
        protonLink.free();
        future.tryFail(new ServerErrorException(503));
    }
}
