package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.AbstractCommandContext;
import org.eclipse.hono.client.command.CommandAlreadyProcessedException;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.CommandToBeReprocessedException;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.MessagingType;

/* loaded from: input_file:org/eclipse/hono/client/command/kafka/KafkaBasedCommandContext.class */
public class KafkaBasedCommandContext extends AbstractCommandContext<KafkaBasedCommand> implements CommandContext {
    private static final String PROPERTY_NAME_DELIVERY_FAILURE_RESPONSES_DISABLED = "HONO_DISABLE_KAFKA_COMMAND_DELIVERY_FAILURE_RESPONSES";
    private static final boolean DELIVERY_FAILURE_RESPONSES_DISABLED = Boolean.parseBoolean(getProperty(PROPERTY_NAME_DELIVERY_FAILURE_RESPONSES_DISABLED));

    public KafkaBasedCommandContext(KafkaBasedCommand kafkaBasedCommand, CommandResponseSender commandResponseSender, Span span) {
        super(span, kafkaBasedCommand, commandResponseSender);
    }

    public final void release(Throwable th) {
        Objects.requireNonNull(th);
        if (setCompleted("released")) {
            Span tracingSpan = getTracingSpan();
            TracingHelper.logError(tracingSpan, "command could not be delivered or processed", th);
            ServiceInvocationException serverError = StatusCodeMapper.toServerError(th);
            int errorCode = serverError.getErrorCode();
            Tags.HTTP_STATUS.set(tracingSpan, Integer.valueOf(errorCode));
            if (DELIVERY_FAILURE_RESPONSES_DISABLED || !isRequestResponseCommand() || (th instanceof CommandAlreadyProcessedException) || (th instanceof CommandToBeReprocessedException)) {
                tracingSpan.finish();
            } else {
                sendDeliveryFailureCommandResponseMessage(errorCode, (String) Optional.ofNullable(ServiceInvocationException.getErrorMessageForExternalClient(serverError)).orElse("Temporarily unavailable"), tracingSpan, th, getCorrelationId(), MessagingType.kafka).onComplete(asyncResult -> {
                    tracingSpan.finish();
                });
            }
        }
    }

    public final void modify(boolean z, boolean z2) {
        if (setCompleted("modified")) {
            Span tracingSpan = getTracingSpan();
            TracingHelper.logError(tracingSpan, "command for device handled with outcome 'modified'" + (z ? "; delivery failed" : "") + (z2 ? "; undeliverable here" : ""));
            int i = z2 ? 404 : 503;
            Tags.HTTP_STATUS.set(tracingSpan, Integer.valueOf(i));
            if (DELIVERY_FAILURE_RESPONSES_DISABLED || !isRequestResponseCommand()) {
                tracingSpan.finish();
            } else {
                sendDeliveryFailureCommandResponseMessage(i, "command not processed" + (z ? "; delivery failed" : "") + (z2 ? "; undeliverable here" : ""), tracingSpan, null, getCorrelationId(), MessagingType.kafka).onComplete(asyncResult -> {
                    tracingSpan.finish();
                });
            }
        }
    }

    public final void reject(String str) {
        TracingHelper.logError(getTracingSpan(), "client error trying to deliver or process command: " + str);
        reject(400, str);
    }

    public final void reject(Throwable th) {
        int errorCode = th instanceof ClientErrorException ? ((ClientErrorException) th).getErrorCode() : 400;
        TracingHelper.logError(getTracingSpan(), "client error trying to deliver or process command", th);
        reject(errorCode, th.getMessage());
    }

    private void reject(int i, String str) {
        if (setCompleted("rejected")) {
            Span tracingSpan = getTracingSpan();
            Tags.HTTP_STATUS.set(tracingSpan, Integer.valueOf(i));
            if (DELIVERY_FAILURE_RESPONSES_DISABLED || !isRequestResponseCommand()) {
                tracingSpan.finish();
            } else {
                sendDeliveryFailureCommandResponseMessage(i, (String) Optional.ofNullable(str).orElse("Command message rejected"), tracingSpan, null, getCorrelationId(), MessagingType.kafka).onComplete(asyncResult -> {
                    tracingSpan.finish();
                });
            }
        }
    }

    private String getCorrelationId() {
        return (String) KafkaRecordHelper.getCorrelationId(((KafkaBasedCommand) getCommand()).getRecord().headers()).orElse(null);
    }

    private static String getProperty(String str) {
        return System.getProperty(str, System.getenv(str));
    }

    static {
        if (DELIVERY_FAILURE_RESPONSES_DISABLED) {
            LOG.info("sending of command delivery failure response messages is disabled");
        }
    }
}
