package org.eclipse.ditto.internal.models.acks;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithOptionalEntity;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayCommandTimeoutException;
import org.eclipse.ditto.connectivity.model.ConnectionIdInvalidException;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.signal.CommandHeaderRestoration;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.models.signal.correlation.CommandAndCommandResponseMatchingValidator;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommandResponse;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.acks.ThingAcknowledgementFactory;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;

/* loaded from: input_file:org/eclipse/ditto/internal/models/acks/AcknowledgementAggregatorActor.class */
public final class AcknowledgementAggregatorActor extends AbstractActorWithTimers {
    private static final Duration COMMAND_TIMEOUT = Duration.ofSeconds(60);
    private static final Duration SMART_CHANNEL_BUFFER = Duration.ofSeconds(10);
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final String correlationId;
    private final Signal<?> originatingSignal;
    private final AcknowledgementAggregator ackregator;
    private final Consumer<Object> responseSignalConsumer;
    private final Duration timeout;
    private final Consumer<MatchingValidationResult.Failure> matchingValidationFailureConsumer;
    private Function<Acknowledgements, ThingErrorResponse> getAsTimeoutErrorResponse;

    /* loaded from: input_file:org/eclipse/ditto/internal/models/acks/AcknowledgementAggregatorActor$Control.class */
    private enum Control {
        WAITING_FOR_ACKS_TIMED_OUT
    }

    private AcknowledgementAggregatorActor(EntityId entityId, Signal<?> signal, @Nullable Duration duration, Duration duration2, HeaderTranslator headerTranslator, Consumer<Object> consumer, @Nullable Consumer<MatchingValidationResult.Failure> consumer2) {
        this.responseSignalConsumer = consumer;
        this.originatingSignal = signal;
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        this.correlationId = (String) dittoHeaders.getCorrelationId().orElseGet(() -> {
            return getSelf().path().name();
        });
        this.timeout = getTimeout(signal, duration2, duration);
        this.matchingValidationFailureConsumer = (Consumer) Objects.requireNonNullElseGet(consumer2, this::getDefaultMatchingValidationFailureConsumer);
        timers().startSingleTimer(Control.WAITING_FOR_ACKS_TIMED_OUT, Control.WAITING_FOR_ACKS_TIMED_OUT, this.timeout);
        this.getAsTimeoutErrorResponse = getDefaultGetAsTimeoutErrorResponse();
        Set acknowledgementRequests = dittoHeaders.getAcknowledgementRequests();
        this.ackregator = AcknowledgementAggregator.getInstance(entityId, this.correlationId, this.timeout, headerTranslator);
        this.ackregator.addAcknowledgementRequests(acknowledgementRequests);
        this.log.withCorrelationId(this.correlationId).info("Starting to wait for all requested acknowledgements <{}> for a maximum duration of <{}>.", acknowledgementRequests, this.timeout);
    }

    private Function<Acknowledgements, ThingErrorResponse> getDefaultGetAsTimeoutErrorResponse() {
        return acknowledgements -> {
            return ThingErrorResponse.of(ThingId.of(acknowledgements.getEntityId()), GatewayCommandTimeoutException.newBuilder(this.timeout).dittoHeaders(acknowledgements.getDittoHeaders()).build());
        };
    }

    private Consumer<MatchingValidationResult.Failure> getDefaultMatchingValidationFailureConsumer() {
        return failure -> {
            this.log.withCorrelationId(this.originatingSignal).warning("No {} consumer provided. Thus no further processing of response validation failure is going to happen.", MatchingValidationResult.Failure.class.getSimpleName());
        };
    }

    static Props props(EntityId entityId, Signal<?> signal, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Object> consumer, @Nullable Consumer<MatchingValidationResult.Failure> consumer2) {
        return Props.create(AcknowledgementAggregatorActor.class, new Object[]{entityId, signal, null, acknowledgementConfig.getForwarderFallbackTimeout(), headerTranslator, consumer, consumer2});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(EntityId entityId, Signal<?> signal, @Nullable Duration duration, Duration duration2, HeaderTranslator headerTranslator, Consumer<Object> consumer, @Nullable Consumer<MatchingValidationResult.Failure> consumer2) {
        return Props.create(AcknowledgementAggregatorActor.class, new Object[]{entityId, signal, duration, duration2, headerTranslator, consumer, consumer2});
    }

    public AbstractActor.Receive createReceive() {
        ReceiveBuilder match = receiveBuilder().match(ThingCommandResponse.class, this::handleThingCommandResponse).match(MessageCommandResponse.class, this::handleMessageCommandResponse).match(Acknowledgement.class, this::handleAcknowledgement).match(Acknowledgements.class, this::handleAcknowledgements).match(DittoRuntimeException.class, this::handleDittoRuntimeException);
        Control control = Control.WAITING_FOR_ACKS_TIMED_OUT;
        Objects.requireNonNull(control);
        return match.match(Control.class, (v1) -> {
            return r2.equals(v1);
        }, this::handleReceiveTimeout).matchAny(obj -> {
            this.log.warning("Received unexpected message: <{}>", obj);
        }).build();
    }

    private void handleThingCommandResponse(ThingCommandResponse<?> thingCommandResponse) {
        this.log.withCorrelationId(this.correlationId).debug("Received thing command response <{}>.", thingCommandResponse);
        MatchingValidationResult validateResponse = validateResponse(thingCommandResponse);
        if (validateResponse.isSuccess()) {
            addCommandResponse(thingCommandResponse, getAcknowledgement(thingCommandResponse, getAckLabelOfResponse(this.originatingSignal)));
        } else {
            handleMatchingValidationFailure(validateResponse.asFailureOrThrow());
        }
    }

    private MatchingValidationResult validateResponse(CommandResponse<?> commandResponse) {
        return (SignalInformationPoint.isLiveCommand(this.originatingSignal) || SignalInformationPoint.isChannelSmart(this.originatingSignal)) ? tryToValidateLiveResponse((Command) this.originatingSignal, commandResponse) : MatchingValidationResult.success();
    }

    private MatchingValidationResult tryToValidateLiveResponse(Command<?> command, CommandResponse<?> commandResponse) {
        try {
            return validateLiveResponse(command, commandResponse);
        } catch (ConnectionIdInvalidException e) {
            this.log.withCorrelationId(command).error("Headers of command response contain an invalid connection ID: {}. Repeating validation without the invalid connection ID. This means that the validation failure will not appear in connection log.", e.getMessage());
            return validateLiveResponse(command, commandResponse.setDittoHeaders(DittoHeaders.newBuilder(commandResponse.getDittoHeaders()).removeHeader(DittoHeaderDefinition.CONNECTION_ID.getKey()).build()));
        }
    }

    private static MatchingValidationResult validateLiveResponse(Command<?> command, CommandResponse<?> commandResponse) {
        return CommandAndCommandResponseMatchingValidator.getInstance().apply(command, commandResponse);
    }

    private static Acknowledgement getAcknowledgement(ThingCommandResponse<?> thingCommandResponse, AcknowledgementLabel acknowledgementLabel) {
        return ThingAcknowledgementFactory.newAcknowledgement(acknowledgementLabel, thingCommandResponse.getEntityId(), thingCommandResponse.getHttpStatus(), thingCommandResponse.getDittoHeaders(), getPayload(thingCommandResponse).orElse(null));
    }

    private static Optional<JsonValue> getPayload(ThingCommandResponse<?> thingCommandResponse) {
        return thingCommandResponse instanceof WithOptionalEntity ? ((WithOptionalEntity) thingCommandResponse).getEntity(thingCommandResponse.getImplementedSchemaVersion()) : Optional.empty();
    }

    private void addCommandResponse(CommandResponse<?> commandResponse, Acknowledgement acknowledgement) {
        this.ackregator.addReceivedAcknowledgment(acknowledgement);
        potentiallyCompleteAcknowledgements(commandResponse);
    }

    private void handleMatchingValidationFailure(MatchingValidationResult.Failure failure) {
        String detailMessage = failure.getDetailMessage();
        this.log.withCorrelationId(this.originatingSignal).warning("Received invalid response. Reason: {} Response: {}.", detailMessage, failure.getCommandResponse());
        this.getAsTimeoutErrorResponse = getInvalidLiveResponseReceivedGetAsTimeoutErrorResponse(detailMessage);
        this.matchingValidationFailureConsumer.accept(failure);
    }

    private Function<Acknowledgements, ThingErrorResponse> getInvalidLiveResponseReceivedGetAsTimeoutErrorResponse(String str) {
        return acknowledgements -> {
            GatewayCommandTimeoutException build = GatewayCommandTimeoutException.newBuilder(this.timeout).dittoHeaders(acknowledgements.getDittoHeaders()).description(MessageFormat.format("Received no appropriate live response within the specified timeout. An invalid response was received, though: {0}", str)).build();
            return (ThingErrorResponse) SignalInformationPoint.getEntityId(this.originatingSignal).map((v0) -> {
                return ThingId.of(v0);
            }).map(thingId -> {
                return ThingErrorResponse.of(thingId, build);
            }).orElseGet(() -> {
                return ThingErrorResponse.of(build);
            });
        };
    }

    private void handleMessageCommandResponse(MessageCommandResponse<?, ?> messageCommandResponse) {
        this.log.withCorrelationId(this.correlationId).debug("Received message command response <{}>.", messageCommandResponse);
        MatchingValidationResult validateResponse = validateResponse(messageCommandResponse);
        if (validateResponse.isSuccess()) {
            addCommandResponse(messageCommandResponse, getAcknowledgement(messageCommandResponse));
        } else {
            handleMatchingValidationFailure(validateResponse.asFailureOrThrow());
        }
    }

    private static Acknowledgement getAcknowledgement(MessageCommandResponse<?, ?> messageCommandResponse) {
        return ThingAcknowledgementFactory.newAcknowledgement(DittoAcknowledgementLabel.LIVE_RESPONSE, messageCommandResponse.getEntityId(), messageCommandResponse.getHttpStatus(), messageCommandResponse.getDittoHeaders().toBuilder().putHeaders(messageCommandResponse.getMessage().getHeaders()).build(), getPayload(messageCommandResponse).orElse(null));
    }

    private static Optional<JsonValue> getPayload(MessageCommandResponse<?, ?> messageCommandResponse) {
        return messageCommandResponse.toJson().getValue(MessageCommandResponse.JsonFields.JSON_MESSAGE.getPointer().append(MessageCommandResponse.JsonFields.JSON_MESSAGE_PAYLOAD.getPointer()));
    }

    private void handleReceiveTimeout(Control control) {
        this.log.withCorrelationId(this.correlationId).info("Timed out waiting for all requested acknowledgements, completing Acknowledgements with timeouts...");
        completeAcknowledgements(null);
    }

    private void handleAcknowledgement(Acknowledgement acknowledgement) {
        this.log.withCorrelationId(this.correlationId).debug("Received acknowledgement <{}>.", acknowledgement);
        this.ackregator.addReceivedAcknowledgment(acknowledgement);
        potentiallyCompleteAcknowledgements(null);
    }

    private void handleAcknowledgements(Acknowledgements acknowledgements) {
        this.log.withCorrelationId(this.correlationId).debug("Received acknowledgements <{}>.", acknowledgements);
        Stream stream = acknowledgements.stream();
        AcknowledgementAggregator acknowledgementAggregator = this.ackregator;
        Objects.requireNonNull(acknowledgementAggregator);
        stream.forEach(acknowledgementAggregator::addReceivedAcknowledgment);
        potentiallyCompleteAcknowledgements(null);
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        this.log.withCorrelationId(this.correlationId).info("Stopped waiting for acknowledgements because of ditto runtime exception <{}>.", dittoRuntimeException);
        handleSignal(dittoRuntimeException);
        getContext().stop(getSelf());
    }

    private void potentiallyCompleteAcknowledgements(@Nullable CommandResponse<?> commandResponse) {
        if (this.ackregator.receivedAllRequestedAcknowledgements()) {
            completeAcknowledgements(commandResponse);
        }
    }

    private void completeAcknowledgements(@Nullable CommandResponse<?> commandResponse) {
        Acknowledgements aggregatedAcknowledgements = this.ackregator.getAggregatedAcknowledgements(this.originatingSignal.getDittoHeaders());
        boolean containsOnlyTwinPersistedOrLiveResponse = containsOnlyTwinPersistedOrLiveResponse(aggregatedAcknowledgements);
        if (null != commandResponse && containsOnlyTwinPersistedOrLiveResponse) {
            handleSignal(commandResponse);
        } else if (!containsOnlyTwinPersistedOrLiveResponse || this.ackregator.receivedAllRequestedAcknowledgements()) {
            this.log.withCorrelationId(this.originatingSignal).debug("Completing with collected acknowledgements: {}", aggregatedAcknowledgements);
            handleSignal(aggregatedAcknowledgements);
        } else {
            handleSignal((DittoHeadersSettable) this.getAsTimeoutErrorResponse.apply(aggregatedAcknowledgements));
        }
        getContext().stop(getSelf());
    }

    private void handleSignal(DittoHeadersSettable<?> dittoHeadersSettable) {
        this.responseSignalConsumer.accept(CommandHeaderRestoration.restoreCommandConnectivityHeaders(dittoHeadersSettable, this.originatingSignal.getDittoHeaders()));
    }

    private static boolean containsOnlyTwinPersistedOrLiveResponse(Acknowledgements acknowledgements) {
        return acknowledgements.getSize() == 1 && acknowledgements.stream().anyMatch(acknowledgement -> {
            AcknowledgementLabel label = acknowledgement.getLabel();
            return DittoAcknowledgementLabel.TWIN_PERSISTED.equals(label) || DittoAcknowledgementLabel.LIVE_RESPONSE.equals(label);
        });
    }

    private static AcknowledgementLabel getAckLabelOfResponse(Signal<?> signal) {
        return (SignalInformationPoint.isChannelLive(signal) || SignalInformationPoint.isChannelSmart(signal)) ? DittoAcknowledgementLabel.LIVE_RESPONSE : DittoAcknowledgementLabel.TWIN_PERSISTED;
    }

    private static Duration getTimeout(Signal<?> signal, Duration duration, @Nullable Duration duration2) {
        return duration2 != null ? duration2 : SignalInformationPoint.isChannelSmart(signal) ? ((Duration) signal.getDittoHeaders().getTimeout().orElse(COMMAND_TIMEOUT)).plus(SMART_CHANNEL_BUFFER) : (Duration) signal.getDittoHeaders().getTimeout().filter(duration3 -> {
            return duration3.minus(duration).isNegative();
        }).orElse(duration);
    }
}
