package org.eclipse.ditto.internal.models.acks;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.entity.id.AbstractNamespacedEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.signals.WithOptionalEntity;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayCommandTimeoutException;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommandResponse;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.WithThingId;
import org.eclipse.ditto.things.model.signals.acks.ThingAcknowledgementFactory;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;

/* loaded from: input_file:org/eclipse/ditto/internal/models/acks/AcknowledgementAggregatorActor.class */
public final class AcknowledgementAggregatorActor extends AbstractActor {
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final String correlationId;
    private final DittoHeaders requestCommandHeaders;
    private final AcknowledgementAggregator ackregator;
    private final Consumer<Object> responseSignalConsumer;
    private final Duration timeout;

    private AcknowledgementAggregatorActor(ThingId thingId, DittoHeaders dittoHeaders, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Object> consumer) {
        this.responseSignalConsumer = consumer;
        this.requestCommandHeaders = dittoHeaders;
        this.correlationId = (String) this.requestCommandHeaders.getCorrelationId().orElseGet(() -> {
            return getSelf().path().name();
        });
        Optional timeout = this.requestCommandHeaders.getTimeout();
        Objects.requireNonNull(acknowledgementConfig);
        this.timeout = (Duration) timeout.orElseGet(acknowledgementConfig::getForwarderFallbackTimeout);
        getContext().setReceiveTimeout(this.timeout);
        Set acknowledgementRequests = this.requestCommandHeaders.getAcknowledgementRequests();
        this.ackregator = AcknowledgementAggregator.getInstance((AbstractNamespacedEntityId) thingId, (CharSequence) this.correlationId, this.timeout, headerTranslator);
        this.ackregator.addAcknowledgementRequests(acknowledgementRequests);
        this.log.withCorrelationId(this.correlationId).info("Starting to wait for all requested acknowledgements <{}> for a maximum duration of <{}>.", acknowledgementRequests, this.timeout);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(ThingId thingId, DittoHeaders dittoHeaders, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Object> consumer) {
        return Props.create(AcknowledgementAggregatorActor.class, new Object[]{thingId, dittoHeaders, acknowledgementConfig, headerTranslator, consumer});
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(ThingCommandResponse.class, this::handleThingCommandResponse).match(MessageCommandResponse.class, this::handleMessageCommandResponse).match(Acknowledgement.class, this::handleAcknowledgement).match(Acknowledgements.class, this::handleAcknowledgements).match(DittoRuntimeException.class, this::handleDittoRuntimeException).match(ReceiveTimeout.class, this::handleReceiveTimeout).matchAny(obj -> {
            this.log.warning("Received unexpected message: <{}>", obj);
        }).build();
    }

    private void handleThingCommandResponse(ThingCommandResponse<?> thingCommandResponse) {
        Stream stream = thingCommandResponse.getDittoHeaders().getChannel().stream();
        String name = TopicPath.Channel.LIVE.getName();
        Objects.requireNonNull(name);
        addCommandResponse(thingCommandResponse, thingCommandResponse, stream.anyMatch((v1) -> {
            return r1.equals(v1);
        }));
    }

    private void handleMessageCommandResponse(MessageCommandResponse<?, ?> messageCommandResponse) {
        addCommandResponse(messageCommandResponse, messageCommandResponse, true);
    }

    private void addCommandResponse(CommandResponse<?> commandResponse, WithThingId withThingId, boolean z) {
        this.log.withCorrelationId(this.correlationId).debug("Received command response <{}>.", commandResponse);
        this.ackregator.addReceivedAcknowledgment(z ? toLiveResponseAcknowledgement(commandResponse, withThingId) : toTwinPersistedAcknowledgement(commandResponse, withThingId));
        potentiallyCompleteAcknowledgements(commandResponse);
    }

    private static Acknowledgement toLiveResponseAcknowledgement(CommandResponse<?> commandResponse, WithThingId withThingId) {
        return ThingAcknowledgementFactory.newAcknowledgement(DittoAcknowledgementLabel.LIVE_RESPONSE, withThingId.getEntityId(), commandResponse.getHttpStatus(), commandResponse instanceof MessageCommandResponse ? commandResponse.getDittoHeaders().toBuilder().putHeaders(((MessageCommandResponse) commandResponse).getMessage().getHeaders()).build() : commandResponse.getDittoHeaders(), getPayload(commandResponse).orElse(null));
    }

    private static Acknowledgement toTwinPersistedAcknowledgement(CommandResponse<?> commandResponse, WithThingId withThingId) {
        return ThingAcknowledgementFactory.newAcknowledgement(DittoAcknowledgementLabel.TWIN_PERSISTED, withThingId.getEntityId(), commandResponse.getHttpStatus(), commandResponse.getDittoHeaders(), getPayload(commandResponse).orElse(null));
    }

    private static Optional<JsonValue> getPayload(CommandResponse<?> commandResponse) {
        return commandResponse instanceof WithOptionalEntity ? ((WithOptionalEntity) commandResponse).getEntity(commandResponse.getImplementedSchemaVersion()) : commandResponse instanceof MessageCommandResponse ? commandResponse.toJson().getValue(MessageCommandResponse.JsonFields.JSON_MESSAGE.getPointer().append(MessageCommandResponse.JsonFields.JSON_MESSAGE_PAYLOAD.getPointer())) : Optional.empty();
    }

    private void handleReceiveTimeout(ReceiveTimeout receiveTimeout) {
        this.log.withCorrelationId(this.correlationId).info("Timed out waiting for all requested acknowledgements, completing Acknowledgements with timeouts...");
        completeAcknowledgements(null);
    }

    private void handleAcknowledgement(Acknowledgement acknowledgement) {
        this.log.withCorrelationId(this.correlationId).debug("Received acknowledgement <{}>.", acknowledgement);
        this.ackregator.addReceivedAcknowledgment(acknowledgement);
        potentiallyCompleteAcknowledgements(null);
    }

    private void handleAcknowledgements(Acknowledgements acknowledgements) {
        this.log.withCorrelationId(this.correlationId).debug("Received acknowledgements <{}>.", acknowledgements);
        Stream stream = acknowledgements.stream();
        AcknowledgementAggregator acknowledgementAggregator = this.ackregator;
        Objects.requireNonNull(acknowledgementAggregator);
        stream.forEach(acknowledgementAggregator::addReceivedAcknowledgment);
        potentiallyCompleteAcknowledgements(null);
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        this.log.withCorrelationId(this.correlationId).info("Stopped waiting for acknowledgements because of ditto runtime exception <{}>.", dittoRuntimeException);
        handleSignal(dittoRuntimeException);
        getContext().stop(getSelf());
    }

    private void potentiallyCompleteAcknowledgements(@Nullable CommandResponse<?> commandResponse) {
        if (this.ackregator.receivedAllRequestedAcknowledgements()) {
            completeAcknowledgements(commandResponse);
        }
    }

    private void completeAcknowledgements(@Nullable CommandResponse<?> commandResponse) {
        Acknowledgements aggregatedAcknowledgements = this.ackregator.getAggregatedAcknowledgements(this.requestCommandHeaders);
        boolean containsOnlyTwinPersistedOrLiveResponse = containsOnlyTwinPersistedOrLiveResponse(aggregatedAcknowledgements);
        if (null != commandResponse && containsOnlyTwinPersistedOrLiveResponse) {
            handleSignal(commandResponse);
        } else if (!containsOnlyTwinPersistedOrLiveResponse || this.ackregator.receivedAllRequestedAcknowledgements()) {
            this.log.withCorrelationId(this.requestCommandHeaders).debug("Completing with collected acknowledgements: {}", aggregatedAcknowledgements);
            handleSignal(aggregatedAcknowledgements);
        } else {
            handleSignal(asThingErrorResponse(aggregatedAcknowledgements));
        }
        getContext().stop(getSelf());
    }

    public static DittoHeadersSettable<?> restoreCommandConnectivityHeaders(DittoHeadersSettable<?> dittoHeadersSettable, DittoHeaders dittoHeaders) {
        DittoHeadersBuilder removeHeader = dittoHeadersSettable.getDittoHeaders().toBuilder().removeHeader(DittoHeaderDefinition.EXPECTED_RESPONSE_TYPES.getKey()).removeHeader(DittoHeaderDefinition.INBOUND_PAYLOAD_MAPPER.getKey()).removeHeader(DittoHeaderDefinition.REPLY_TARGET.getKey());
        if (dittoHeaders.containsKey(DittoHeaderDefinition.EXPECTED_RESPONSE_TYPES.getKey())) {
            removeHeader.expectedResponseTypes(dittoHeaders.getExpectedResponseTypes());
        }
        Optional inboundPayloadMapper = dittoHeaders.getInboundPayloadMapper();
        Objects.requireNonNull(removeHeader);
        inboundPayloadMapper.ifPresent(removeHeader::inboundPayloadMapper);
        Optional replyTarget = dittoHeaders.getReplyTarget();
        Objects.requireNonNull(removeHeader);
        replyTarget.ifPresent(removeHeader::replyTarget);
        return dittoHeadersSettable.setDittoHeaders(removeHeader.build());
    }

    private void handleSignal(DittoHeadersSettable<?> dittoHeadersSettable) {
        this.responseSignalConsumer.accept(restoreCommandConnectivityHeaders(dittoHeadersSettable, this.requestCommandHeaders));
    }

    private ThingErrorResponse asThingErrorResponse(Acknowledgements acknowledgements) {
        return ThingErrorResponse.of(ThingId.of(acknowledgements.getEntityId()), GatewayCommandTimeoutException.newBuilder(this.timeout).dittoHeaders(acknowledgements.getDittoHeaders()).build());
    }

    private static boolean containsOnlyTwinPersistedOrLiveResponse(Acknowledgements acknowledgements) {
        return acknowledgements.getSize() == 1 && acknowledgements.stream().anyMatch(acknowledgement -> {
            AcknowledgementLabel label = acknowledgement.getLabel();
            return DittoAcknowledgementLabel.TWIN_PERSISTED.equals(label) || DittoAcknowledgementLabel.LIVE_RESPONSE.equals(label);
        });
    }
}
