package org.eclipse.ditto.edge.service.acknowledgements;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.Props;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.CommandResponseAcknowledgementProvider;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.exceptions.CommandTimeoutException;
import org.eclipse.ditto.base.service.acknowledgements.AcknowledgementAggregator;
import org.eclipse.ditto.internal.models.signal.CommandHeaderRestoration;
import org.eclipse.ditto.internal.models.signal.correlation.CommandAndCommandResponseMatchingValidator;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementAggregatorActor.class */
public final class AcknowledgementAggregatorActor<C extends Command<C>> extends AbstractActorWithTimers {
    private static final Duration COMMAND_TIMEOUT = Duration.ofSeconds(60);
    private static final Duration SMART_CHANNEL_BUFFER = Duration.ofSeconds(10);
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final String correlationId;
    private final C originatingSignal;
    private final AcknowledgementAggregator ackregator;
    private final Consumer<Object> responseSignalConsumer;
    private final Duration timeout;
    private final Consumer<MatchingValidationResult.Failure> matchingValidationFailureConsumer;
    private final CommandResponseAcknowledgementProvider<C> acknowledgementProvider;
    private BiFunction<Acknowledgements, DittoHeaders, DittoRuntimeException> getAsTimeoutError;

    /* loaded from: input_file:org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementAggregatorActor$Control.class */
    private enum Control {
        WAITING_FOR_ACKS_TIMED_OUT
    }

    private AcknowledgementAggregatorActor(EntityId entityId, C c, @Nullable Duration duration, Duration duration2, HeaderTranslator headerTranslator, Consumer<Object> consumer, @Nullable Consumer<MatchingValidationResult.Failure> consumer2, CommandResponseAcknowledgementProvider<C> commandResponseAcknowledgementProvider) {
        this.responseSignalConsumer = consumer;
        this.originatingSignal = c;
        DittoHeaders dittoHeaders = c.getDittoHeaders();
        this.correlationId = (String) dittoHeaders.getCorrelationId().orElseGet(() -> {
            return getSelf().path().name();
        });
        this.timeout = getTimeout(c, duration2, duration);
        this.matchingValidationFailureConsumer = (Consumer) Objects.requireNonNullElseGet(consumer2, this::getDefaultMatchingValidationFailureConsumer);
        timers().startSingleTimer(Control.WAITING_FOR_ACKS_TIMED_OUT, Control.WAITING_FOR_ACKS_TIMED_OUT, this.timeout);
        this.getAsTimeoutError = getDefaultGetAsTimeoutError(c instanceof WithEntityId ? ((WithEntityId) c).getEntityId() : null);
        Set acknowledgementRequests = dittoHeaders.getAcknowledgementRequests();
        this.ackregator = AcknowledgementAggregator.getInstance(entityId, this.correlationId, this.timeout, headerTranslator);
        this.ackregator.addAcknowledgementRequests(acknowledgementRequests);
        this.log.withCorrelationId(this.correlationId).info("Starting to wait for all requested acknowledgements <{}> for a maximum duration of <{}>.", acknowledgementRequests, this.timeout);
        this.acknowledgementProvider = commandResponseAcknowledgementProvider;
    }

    private BiFunction<Acknowledgements, DittoHeaders, DittoRuntimeException> getDefaultGetAsTimeoutError(@Nullable EntityId entityId) {
        return (acknowledgements, dittoHeaders) -> {
            return CommandTimeoutException.newBuilder(this.timeout).dittoHeaders(calculateHeadersWithEntityId(entityId, acknowledgements, dittoHeaders)).build();
        };
    }

    private static DittoHeaders calculateHeadersWithEntityId(@Nullable EntityId entityId, WithDittoHeaders withDittoHeaders, DittoHeaders dittoHeaders) {
        DittoHeadersBuilder putHeaders = withDittoHeaders.getDittoHeaders().toBuilder().putHeaders(dittoHeaders);
        return null != entityId ? putHeaders.putHeader(DittoHeaderDefinition.ENTITY_ID.getKey(), entityId.getEntityType() + ":" + entityId).build() : putHeaders.build();
    }

    private Consumer<MatchingValidationResult.Failure> getDefaultMatchingValidationFailureConsumer() {
        return failure -> {
            this.log.withCorrelationId(this.originatingSignal).warning("No {} consumer provided. Thus no further processing of response validation failure is going to happen.", MatchingValidationResult.Failure.class.getSimpleName());
        };
    }

    static <C extends Command<?>> Props props(EntityId entityId, C c, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Object> consumer, @Nullable Consumer<MatchingValidationResult.Failure> consumer2, CommandResponseAcknowledgementProvider<C> commandResponseAcknowledgementProvider) {
        return Props.create(AcknowledgementAggregatorActor.class, new Object[]{entityId, c, null, acknowledgementConfig.getForwarderFallbackTimeout(), headerTranslator, consumer, consumer2, commandResponseAcknowledgementProvider});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <C extends Command<?>> Props props(EntityId entityId, C c, @Nullable Duration duration, Duration duration2, HeaderTranslator headerTranslator, Consumer<Object> consumer, @Nullable Consumer<MatchingValidationResult.Failure> consumer2, CommandResponseAcknowledgementProvider<C> commandResponseAcknowledgementProvider) {
        return Props.create(AcknowledgementAggregatorActor.class, new Object[]{entityId, c, duration, duration2, headerTranslator, consumer, consumer2, commandResponseAcknowledgementProvider});
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(Acknowledgement.class, this::handleAcknowledgement).match(Acknowledgements.class, this::handleAcknowledgements).match(CommandResponse.class, this::handleCommandResponse).match(CommandTimeoutException.class, this::handleCommandTimeoutException).match(DittoRuntimeException.class, this::handleDittoRuntimeException).matchEquals(Control.WAITING_FOR_ACKS_TIMED_OUT, this::handleReceiveTimeout).matchAny(obj -> {
            this.log.warning("Received unexpected message: <{}>", obj);
        }).build();
    }

    private void addCommandResponse(CommandResponse<?> commandResponse, Acknowledgement acknowledgement) {
        this.ackregator.addReceivedAcknowledgment(acknowledgement);
        potentiallyCompleteAcknowledgements(commandResponse);
    }

    private void handleMatchingValidationFailure(MatchingValidationResult.Failure failure) {
        String detailMessage = failure.getDetailMessage();
        this.log.withCorrelationId(this.originatingSignal).warning("Received invalid response. Reason: {} Response: {}.", detailMessage, failure.getCommandResponse());
        this.getAsTimeoutError = getInvalidLiveResponseReceivedGetAsTimeoutError(detailMessage);
        this.matchingValidationFailureConsumer.accept(failure);
    }

    private BiFunction<Acknowledgements, DittoHeaders, DittoRuntimeException> getInvalidLiveResponseReceivedGetAsTimeoutError(String str) {
        return (acknowledgements, dittoHeaders) -> {
            return CommandTimeoutException.newBuilder(this.timeout).dittoHeaders(calculateHeadersWithEntityId(acknowledgements.getEntityId(), acknowledgements, dittoHeaders)).description(MessageFormat.format("Received no appropriate live response within the specified timeout. An invalid response was received, though: {0}", str)).build();
        };
    }

    private void handleCommandResponse(CommandResponse<?> commandResponse) {
        this.log.withCorrelationId(this.correlationId).debug("Received command response <{}>.", commandResponse);
        MatchingValidationResult validateResponse = validateResponse(commandResponse);
        if (validateResponse.isSuccess()) {
            addCommandResponse(commandResponse, provideAcknowledgement(commandResponse));
        } else {
            handleMatchingValidationFailure(validateResponse.asFailureOrThrow());
        }
    }

    private MatchingValidationResult validateResponse(CommandResponse<?> commandResponse) {
        return (Command.isLiveCommand(this.originatingSignal) || Signal.isChannelSmart(this.originatingSignal)) ? validateLiveResponse(this.originatingSignal, commandResponse) : MatchingValidationResult.success();
    }

    private static MatchingValidationResult validateLiveResponse(Command<?> command, CommandResponse<?> commandResponse) {
        return CommandAndCommandResponseMatchingValidator.getInstance().apply(command, commandResponse);
    }

    private void handleReceiveTimeout(Control control) {
        this.log.withCorrelationId(this.correlationId).info("Timed out waiting for all requested acknowledgements, completing Acknowledgements with timeouts...");
        completeAcknowledgements(null, DittoHeaders.empty());
    }

    private void handleAcknowledgement(Acknowledgement acknowledgement) {
        this.log.withCorrelationId(this.correlationId).debug("Received acknowledgement <{}>.", acknowledgement);
        this.ackregator.addReceivedAcknowledgment(acknowledgement);
        potentiallyCompleteAcknowledgements(null);
    }

    private void handleAcknowledgements(Acknowledgements acknowledgements) {
        this.log.withCorrelationId(this.correlationId).debug("Received acknowledgements <{}>.", acknowledgements);
        Stream stream = acknowledgements.stream();
        AcknowledgementAggregator acknowledgementAggregator = this.ackregator;
        Objects.requireNonNull(acknowledgementAggregator);
        stream.forEach(acknowledgementAggregator::addReceivedAcknowledgment);
        potentiallyCompleteAcknowledgements(null);
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        this.log.withCorrelationId(this.correlationId).info("Stopped waiting for acknowledgements because of ditto runtime exception <{}>.", dittoRuntimeException);
        handleSignal(dittoRuntimeException);
        getContext().stop(getSelf());
    }

    private void handleCommandTimeoutException(CommandTimeoutException commandTimeoutException) {
        this.log.withCorrelationId(this.correlationId).info("Timed out waiting for all requested acknowledgements, completing Acknowledgements with timeouts...");
        completeAcknowledgements(null, commandTimeoutException.getDittoHeaders());
    }

    private Acknowledgement provideAcknowledgement(CommandResponse<?> commandResponse) {
        if (this.acknowledgementProvider.isApplicable(commandResponse)) {
            return this.acknowledgementProvider.provideAcknowledgement(this.originatingSignal, commandResponse);
        }
        if (commandResponse instanceof Acknowledgement) {
            return (Acknowledgement) commandResponse;
        }
        this.log.withCorrelationId(this.originatingSignal).error("Unknown response to transform to Acknowledgement: {}", commandResponse.getType());
        throw DittoInternalErrorException.newBuilder().dittoHeaders(this.originatingSignal.getDittoHeaders()).build();
    }

    private void potentiallyCompleteAcknowledgements(@Nullable CommandResponse<?> commandResponse) {
        if (this.ackregator.receivedAllRequestedAcknowledgements()) {
            completeAcknowledgements(commandResponse, DittoHeaders.empty());
        }
    }

    private void completeAcknowledgements(@Nullable CommandResponse<?> commandResponse, DittoHeaders dittoHeaders) {
        Acknowledgements aggregatedAcknowledgements = this.ackregator.getAggregatedAcknowledgements(this.originatingSignal.getDittoHeaders());
        boolean containsOnlyTwinPersistedOrLiveResponse = containsOnlyTwinPersistedOrLiveResponse(aggregatedAcknowledgements);
        if (null != commandResponse && containsOnlyTwinPersistedOrLiveResponse) {
            handleSignal(commandResponse);
        } else if (!containsOnlyTwinPersistedOrLiveResponse || this.ackregator.receivedAllRequestedAcknowledgements()) {
            this.log.withCorrelationId(this.originatingSignal).debug("Completing with collected acknowledgements: {}", aggregatedAcknowledgements);
            handleSignal(aggregatedAcknowledgements);
        } else {
            handleSignal((DittoHeadersSettable) this.getAsTimeoutError.apply(aggregatedAcknowledgements, dittoHeaders));
        }
        getContext().stop(getSelf());
    }

    private void handleSignal(DittoHeadersSettable<?> dittoHeadersSettable) {
        this.responseSignalConsumer.accept(CommandHeaderRestoration.restoreCommandConnectivityHeaders(dittoHeadersSettable, this.originatingSignal.getDittoHeaders()));
    }

    private static boolean containsOnlyTwinPersistedOrLiveResponse(Acknowledgements acknowledgements) {
        return acknowledgements.getSize() == 1 && acknowledgements.stream().anyMatch(acknowledgement -> {
            AcknowledgementLabel label = acknowledgement.getLabel();
            return DittoAcknowledgementLabel.TWIN_PERSISTED.equals(label) || DittoAcknowledgementLabel.LIVE_RESPONSE.equals(label);
        });
    }

    private static Duration getTimeout(Signal<?> signal, Duration duration, @Nullable Duration duration2) {
        return duration2 != null ? duration2 : Signal.isChannelSmart(signal) ? ((Duration) signal.getDittoHeaders().getTimeout().orElse(COMMAND_TIMEOUT)).plus(SMART_CHANNEL_BUFFER) : (Duration) signal.getDittoHeaders().getTimeout().filter(duration3 -> {
            return duration3.minus(duration).isNegative();
        }).orElse(duration);
    }
}
