package org.eclipse.ditto.services.models.acks;

import akka.actor.AbstractActor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityIdWithType;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.services.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.acks.base.AcknowledgementCorrelationIdMissingException;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.things.ThingCommandResponse;
import org.eclipse.ditto.signals.commands.things.acks.ThingModifyCommandAckRequestSetter;
import org.eclipse.ditto.signals.commands.things.modify.ThingModifyCommand;

/* loaded from: input_file:org/eclipse/ditto/services/models/acks/AcknowledgementAggregatorActor.class */
public final class AcknowledgementAggregatorActor extends AbstractActor {
    static final String ACTOR_NAME_PREFIX = "ackAggregator-";
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final String correlationId;
    private final DittoHeaders requestCommandHeaders;
    private final AcknowledgementAggregator ackregator;
    private final Consumer<Signal<?>> responseSignalConsumer;

    private AcknowledgementAggregatorActor(ThingModifyCommand<?> thingModifyCommand, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Signal<?>> consumer) {
        this.responseSignalConsumer = consumer;
        this.requestCommandHeaders = thingModifyCommand.getDittoHeaders();
        this.correlationId = (String) this.requestCommandHeaders.getCorrelationId().orElseGet(() -> {
            return getSelf().path().name().replace(ACTOR_NAME_PREFIX, "");
        });
        this.ackregator = createAcknowledgementAggregator(acknowledgementConfig, headerTranslator, this.correlationId, thingModifyCommand);
        this.ackregator.addAcknowledgementRequests(this.requestCommandHeaders.getAcknowledgementRequests());
        getContext().setReceiveTimeout((Duration) this.requestCommandHeaders.getTimeout().orElse(acknowledgementConfig.getForwarderFallbackTimeout()));
    }

    private static AcknowledgementAggregator createAcknowledgementAggregator(AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, String str, ThingModifyCommand<?> thingModifyCommand) {
        Optional timeout = thingModifyCommand.getDittoHeaders().getTimeout();
        Objects.requireNonNull(acknowledgementConfig);
        return AcknowledgementAggregator.getInstance((NamespacedEntityIdWithType) thingModifyCommand.getEntityId(), (CharSequence) str, (Duration) timeout.orElseGet(acknowledgementConfig::getForwarderFallbackTimeout), headerTranslator);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(ThingModifyCommand<?> thingModifyCommand, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Signal<?>> consumer) {
        return Props.create(AcknowledgementAggregatorActor.class, new Object[]{ThingModifyCommandAckRequestSetter.getInstance().apply(thingModifyCommand), acknowledgementConfig, headerTranslator, consumer});
    }

    public static String determineActorName(DittoHeaders dittoHeaders) {
        ConditionChecker.checkNotNull(dittoHeaders, "dittoHeaders");
        return "ackAggregator-" + URLEncoder.encode((String) dittoHeaders.getCorrelationId().orElseThrow(() -> {
            return AcknowledgementCorrelationIdMissingException.newBuilder().dittoHeaders(dittoHeaders).build();
        }), Charset.defaultCharset());
    }

    public static Optional<ActorRef> startAcknowledgementAggregator(ActorContext actorContext, ThingModifyCommand<?> thingModifyCommand, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Signal<?>> consumer) {
        return AcknowledgementAggregatorActorStarter.getInstance(actorContext, thingModifyCommand, acknowledgementConfig, headerTranslator, consumer).get();
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(ThingCommandResponse.class, thingCommandResponse -> {
            this.ackregator.addReceivedTwinPersistedAcknowledgment(thingCommandResponse);
            potentiallyCompleteAcknowledgements(thingCommandResponse, thingCommandResponse.getDittoHeaders());
        }).match(Acknowledgement.class, this::handleAcknowledgement).match(ReceiveTimeout.class, this::handleReceiveTimeout).matchAny(obj -> {
            this.log.warning("Received unexpected message: <{}>", obj);
        }).build();
    }

    private void handleReceiveTimeout(ReceiveTimeout receiveTimeout) {
        this.log.withCorrelationId(this.correlationId).info("Timed out waiting for all requested acknowledgements, completing Acknowledgements with timeouts...");
        completeAcknowledgements(null, this.requestCommandHeaders);
    }

    private void handleAcknowledgement(Acknowledgement acknowledgement) {
        this.ackregator.addReceivedAcknowledgment(acknowledgement);
        potentiallyCompleteAcknowledgements(null, acknowledgement.getDittoHeaders());
    }

    private void potentiallyCompleteAcknowledgements(@Nullable ThingCommandResponse<?> thingCommandResponse, DittoHeaders dittoHeaders) {
        if (this.ackregator.receivedAllRequestedAcknowledgements()) {
            completeAcknowledgements(thingCommandResponse, dittoHeaders);
        }
    }

    private void completeAcknowledgements(@Nullable ThingCommandResponse<?> thingCommandResponse, DittoHeaders dittoHeaders) {
        Acknowledgements aggregatedAcknowledgements = this.ackregator.getAggregatedAcknowledgements(dittoHeaders);
        if (this.ackregator.isSuccessful() && null != thingCommandResponse && containsOnlyTwinPersisted(aggregatedAcknowledgements)) {
            handleSignal(thingCommandResponse);
        } else {
            this.log.withCorrelationId(dittoHeaders).debug("Completing with collected acknowledgements: {}", aggregatedAcknowledgements);
            handleSignal(aggregatedAcknowledgements);
        }
        getContext().stop(getSelf());
    }

    private void handleSignal(Signal<?> signal) {
        this.responseSignalConsumer.accept(signal);
    }

    private static boolean containsOnlyTwinPersisted(Acknowledgements acknowledgements) {
        return acknowledgements.getSize() == 1 && acknowledgements.stream().anyMatch(acknowledgement -> {
            return DittoAcknowledgementLabel.TWIN_PERSISTED.equals(acknowledgement.getLabel());
        });
    }
}
