package org.eclipse.hono.client.command.amqp;

import io.opentracing.Span;
import io.vertx.core.Future;
import io.vertx.proton.ProtonHelper;
import java.util.Objects;
import java.util.Optional;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.client.amqp.SenderCachingServiceClient;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.InternalCommandSender;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.MessageHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/command/amqp/ProtonBasedInternalCommandSender.class */
public class ProtonBasedInternalCommandSender extends SenderCachingServiceClient implements InternalCommandSender {
    private static final Logger LOG = LoggerFactory.getLogger(ProtonBasedInternalCommandSender.class);

    public ProtonBasedInternalCommandSender(HonoConnection honoConnection) {
        super(honoConnection, SendMessageSampler.Factory.noop(), false);
    }

    public Future<Void> sendCommand(CommandContext commandContext, String str) {
        Objects.requireNonNull(commandContext);
        Objects.requireNonNull(str);
        return getOrCreateSenderLink(getTargetAddress(str)).recover(th -> {
            return Future.failedFuture(StatusCodeMapper.toServerError(th));
        }).compose(genericSenderLink -> {
            Span newChildSpan = newChildSpan(commandContext.getTracingContext(), "delegate Command request");
            Command command = commandContext.getCommand();
            Message adoptOrCreateMessage = adoptOrCreateMessage(command);
            TracingHelper.setDeviceTags(newChildSpan, command.getTenant(), command.getDeviceId());
            if (command.isTargetedAtGateway()) {
                MessageHelper.addProperty(adoptOrCreateMessage, "via", command.getGatewayId());
                TracingHelper.TAG_GATEWAY_ID.set(newChildSpan, command.getGatewayId());
            }
            return genericSenderLink.sendAndWaitForRawOutcome(adoptOrCreateMessage, newChildSpan);
        }).map(protonDelivery -> {
            Rejected remoteState = protonDelivery.getRemoteState();
            LOG.trace("command [{}] sent to downstream peer; remote state of delivery: {}", commandContext.getCommand(), remoteState);
            if (Accepted.class.isInstance(remoteState)) {
                commandContext.accept();
            } else if (Rejected.class.isInstance(remoteState)) {
                commandContext.reject((String) Optional.ofNullable(remoteState.getError()).map((v0) -> {
                    return v0.getDescription();
                }).orElse(null));
            } else if (Released.class.isInstance(remoteState)) {
                commandContext.release();
            } else if (Modified.class.isInstance(remoteState)) {
                Modified modified = (Modified) remoteState;
                commandContext.modify(modified.getDeliveryFailed().booleanValue(), modified.getUndeliverableHere().booleanValue());
            }
            return (Void) null;
        }).onFailure(th2 -> {
            LOG.debug("failed to send command [{}] to downstream peer", commandContext.getCommand(), th2);
            if (th2 instanceof NoConsumerException) {
                TracingHelper.logError(commandContext.getTracingSpan(), "no credit - target adapter instance '" + str + "' may be offline in which case the device hasn't subscribed again yet");
            }
            commandContext.release(th2);
        });
    }

    static String getTargetAddress(String str) {
        return "command_internal/" + ((String) Objects.requireNonNull(str));
    }

    private Message adoptOrCreateMessage(Command command) {
        Message message;
        if (command instanceof ProtonBasedCommand) {
            message = MessageHelper.getShallowCopy(((ProtonBasedCommand) command).getMessage());
        } else {
            message = ProtonHelper.message();
            MessageHelper.setPayload(message, command.getContentType(), command.getPayload() != null ? command.getPayload().getBytes() : null);
            message.setAddress(getCommandMessageAddress(command));
            message.setSubject(command.getName());
            if (command.getContentType() != null) {
                message.setContentType(command.getContentType());
            }
        }
        if (command.getCorrelationId() != null) {
            message.setCorrelationId(command.getCorrelationId());
        }
        if (!command.isOneWay()) {
            message.setReplyTo(getReplyToAddress(command));
        }
        return message;
    }

    private String getCommandMessageAddress(Command command) {
        return String.format("%s/%s/%s", "command", command.getTenant(), command.getDeviceId());
    }

    private String getReplyToAddress(Command command) {
        if (command.isOneWay()) {
            return null;
        }
        return String.format("%s/%s/%s", "command_response", command.getTenant(), command.getReplyToId());
    }

    public String toString() {
        return ProtonBasedInternalCommandSender.class.getName() + " via AMQP 1.0 Messaging Network";
    }
}
