package org.eclipse.hono.telemetry.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonSession;
import java.util.Objects;
import org.eclipse.hono.telemetry.SenderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/eclipse/hono/telemetry/impl/SenderFactoryImpl.class */
public class SenderFactoryImpl implements SenderFactory {
    private static final Logger LOG = LoggerFactory.getLogger(SenderFactoryImpl.class);

    @Override // org.eclipse.hono.telemetry.SenderFactory
    public void createSender(ProtonConnection protonConnection, String str, Handler<ProtonSender> handler, Future<ProtonSender> future) {
        Objects.requireNonNull(protonConnection);
        Objects.requireNonNull(str);
        Objects.requireNonNull(future);
        if (protonConnection.isDisconnected()) {
            future.fail("no connection to downstream container");
        } else {
            newSession(protonConnection, asyncResult -> {
                if (asyncResult.succeeded()) {
                    newSender(protonConnection, (ProtonSession) asyncResult.result(), str, handler, future);
                } else {
                    future.fail(asyncResult.cause());
                }
            });
        }
    }

    private void newSession(ProtonConnection protonConnection, Handler<AsyncResult<ProtonSession>> handler) {
        protonConnection.createSession().openHandler(handler).open();
    }

    private void newSender(ProtonConnection protonConnection, ProtonSession protonSession, String str, Handler<ProtonSender> handler, Future<ProtonSender> future) {
        ProtonSender createSender = protonSession.createSender(str);
        createSender.setQoS(ProtonQoS.AT_MOST_ONCE);
        createSender.sendQueueDrainHandler(handler);
        createSender.openHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                LOG.debug("sender [{}] for downstream container [{}] open", str, protonConnection.getRemoteContainer());
                future.complete(asyncResult.result());
            } else {
                LOG.debug("could not open sender for downstream container [{}]", protonConnection.getRemoteContainer(), asyncResult.cause());
                future.fail(asyncResult.cause());
            }
        });
        createSender.closeHandler(asyncResult2 -> {
            if (asyncResult2.succeeded()) {
                LOG.debug("sender [{}] for downstream container [{}] closed", str, protonConnection.getRemoteContainer());
            }
        });
        createSender.open();
    }
}
