package org.eclipse.hono.client.device.amqp.impl;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.amqp.GenericSenderLink;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.ConnectionLifecycleWrapper;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.command.CommandConsumer;
import org.eclipse.hono.client.device.amqp.AmqpAdapterClient;
import org.eclipse.hono.client.util.CachingClientFactory;
import org.eclipse.hono.client.util.ClientFactory;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.ResourceIdentifier;

/* loaded from: input_file:org/eclipse/hono/client/device/amqp/impl/ProtonBasedAmqpAdapterClient.class */
public final class ProtonBasedAmqpAdapterClient extends ConnectionLifecycleWrapper<HonoConnection> implements AmqpAdapterClient {
    private final HonoConnection connection;
    private final CachingClientFactory<GenericSenderLink> telemetrySenderClientFactory;
    private final CachingClientFactory<GenericSenderLink> eventSenderClientFactory;
    private final CachingClientFactory<GenericSenderLink> commandResponseSenderClientFactory;
    private final ClientFactory<CommandConsumer> commandConsumerFactory;

    public ProtonBasedAmqpAdapterClient(HonoConnection honoConnection) {
        super(honoConnection);
        this.connection = (HonoConnection) Objects.requireNonNull(honoConnection);
        this.connection.addDisconnectListener(honoConnection2 -> {
            onDisconnect();
        });
        this.telemetrySenderClientFactory = new CachingClientFactory<>(honoConnection.getVertx(), (v0) -> {
            return v0.isOpen();
        });
        this.eventSenderClientFactory = new CachingClientFactory<>(honoConnection.getVertx(), (v0) -> {
            return v0.isOpen();
        });
        this.commandResponseSenderClientFactory = new CachingClientFactory<>(honoConnection.getVertx(), (v0) -> {
            return v0.isOpen();
        });
        this.commandConsumerFactory = new ClientFactory<>();
    }

    private void onDisconnect() {
        this.telemetrySenderClientFactory.onDisconnect();
        this.eventSenderClientFactory.onDisconnect();
        this.commandResponseSenderClientFactory.onDisconnect();
    }

    private long getDefaultConnectionCheckTimeout() {
        return this.connection.getConfig().getLinkEstablishmentTimeout();
    }

    private Future<GenericSenderLink> getOrCreateGenericTelemetrySender() {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r4 -> {
            return this.connection.executeOnContext(promise -> {
                this.telemetrySenderClientFactory.getOrCreateClient("telemetry", () -> {
                    return GenericSenderLink.create(this.connection, str -> {
                        this.telemetrySenderClientFactory.removeClient("telemetry");
                    });
                }, promise);
            });
        });
    }

    private Future<GenericSenderLink> getOrCreateGenericEventSender() {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r4 -> {
            return this.connection.executeOnContext(promise -> {
                this.eventSenderClientFactory.getOrCreateClient("event", () -> {
                    return GenericSenderLink.create(this.connection, str -> {
                        this.telemetrySenderClientFactory.removeClient("event");
                    });
                }, promise);
            });
        });
    }

    private Future<GenericSenderLink> getOrCreateCommandResponseSender() {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r4 -> {
            return this.connection.executeOnContext(promise -> {
                this.commandResponseSenderClientFactory.getOrCreateClient("command_response", () -> {
                    return GenericSenderLink.create(this.connection, str -> {
                        this.commandResponseSenderClientFactory.removeClient("command_response");
                    });
                }, promise);
            });
        });
    }

    private Span createSpan(String str, String str2, String str3, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Span start = TracingHelper.buildChildSpan(this.connection.getTracer(), spanContext, str, getClass().getSimpleName()).ignoreActiveSpan().withTag(Tags.PEER_HOSTNAME.getKey(), this.connection.getConfig().getHost()).withTag(Tags.PEER_PORT.getKey(), Integer.valueOf(this.connection.getConfig().getPort())).withTag(TracingHelper.TAG_PEER_CONTAINER.getKey(), this.connection.getRemoteContainerId()).start();
        TracingHelper.setDeviceTags(start, str2, str3);
        return start;
    }

    private Message createAmqpMessage(Buffer buffer, String str, String str2) {
        Message message = ProtonHelper.message();
        message.setAddress(str2);
        AmqpUtils.setCreationTime(message);
        Optional ofNullable = Optional.ofNullable(str);
        Objects.requireNonNull(message);
        ofNullable.ifPresent(message::setContentType);
        Optional map = Optional.ofNullable(buffer).map((v0) -> {
            return v0.getBytes();
        }).map(Binary::new).map(Data::new);
        Objects.requireNonNull(message);
        map.ifPresent((v1) -> {
            r1.setBody(v1);
        });
        return message;
    }

    private void checkDeviceSpec(String str, String str2) {
        if (str != null && str2 == null) {
            throw new IllegalArgumentException("device ID is required if tenant ID is not null");
        }
    }

    @Override // org.eclipse.hono.client.device.amqp.TelemetrySender
    public Future<ProtonDelivery> sendTelemetry(QoS qoS, Buffer buffer, String str, String str2, String str3, SpanContext spanContext) {
        Objects.requireNonNull(qoS);
        checkDeviceSpec(str2, str3);
        return getOrCreateGenericTelemetrySender().compose(genericSenderLink -> {
            Span createSpan = createSpan("send telemetry", str2, str3, spanContext);
            Message createAmqpMessage = createAmqpMessage(buffer, str, ResourceIdentifier.fromPath(new String[]{"telemetry", str2, str3}).toString());
            return qoS == QoS.AT_MOST_ONCE ? genericSenderLink.send(createAmqpMessage, createSpan) : genericSenderLink.sendAndWaitForOutcome(createAmqpMessage, createSpan);
        });
    }

    @Override // org.eclipse.hono.client.device.amqp.EventSender
    public Future<ProtonDelivery> sendEvent(Buffer buffer, String str, String str2, String str3, SpanContext spanContext) {
        checkDeviceSpec(str2, str3);
        return getOrCreateGenericEventSender().compose(genericSenderLink -> {
            return genericSenderLink.sendAndWaitForOutcome(createAmqpMessage(buffer, str, ResourceIdentifier.fromPath(new String[]{"event", str2, str3}).toString()), createSpan("send event", str2, str3, spanContext));
        });
    }

    @Override // org.eclipse.hono.client.device.amqp.AmqpAdapterClient
    public Future<CommandConsumer> createDeviceSpecificCommandConsumer(String str, String str2, Consumer<Message> consumer) {
        Objects.requireNonNull(str2);
        Objects.requireNonNull(consumer);
        return this.connection.executeOnContext(promise -> {
            this.commandConsumerFactory.createClient(() -> {
                return AmqpAdapterClientCommandConsumer.create(this.connection, str, str2, (protonDelivery, message) -> {
                    consumer.accept(message);
                });
            }, promise);
        });
    }

    @Override // org.eclipse.hono.client.device.amqp.AmqpAdapterClient
    public Future<CommandConsumer> createCommandConsumer(Consumer<Message> consumer) {
        Objects.requireNonNull(consumer);
        return this.connection.executeOnContext(promise -> {
            this.commandConsumerFactory.createClient(() -> {
                return AmqpAdapterClientCommandConsumer.create(this.connection, (protonDelivery, message) -> {
                    consumer.accept(message);
                });
            }, promise);
        });
    }

    @Override // org.eclipse.hono.client.device.amqp.CommandResponder
    public Future<ProtonDelivery> sendCommandResponse(String str, String str2, int i, Buffer buffer, String str3, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return getOrCreateCommandResponseSender().compose(genericSenderLink -> {
            Span createSpan = createSpan("send command response", null, null, spanContext);
            Message createAmqpMessage = createAmqpMessage(buffer, str3, str);
            createAmqpMessage.setCorrelationId(str2);
            AmqpUtils.addStatus(createAmqpMessage, i);
            return genericSenderLink.sendAndWaitForOutcome(createAmqpMessage, createSpan);
        });
    }
}
