package org.eclipse.ditto.internal.models.acks;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.japi.pf.PFBuilder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AbstractCommandAckRequestSetter;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoHeaderInvalidException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.things.model.signals.commands.modify.ThingModifyCommand;
import scala.PartialFunction;

/* loaded from: input_file:org/eclipse/ditto/internal/models/acks/AcknowledgementAggregatorActorStarter.class */
public final class AcknowledgementAggregatorActorStarter {
    private final ActorRefFactory actorRefFactory;
    private final Duration maxTimeout;
    private final HeaderTranslator headerTranslator;
    private final PartialFunction<Signal<?>, Signal<?>> ackRequestSetter;

    @Nullable
    private final Consumer<MatchingValidationResult.Failure> matchingValidationFailureConsumer;
    private int childCounter = 0;

    private AcknowledgementAggregatorActorStarter(ActorRefFactory actorRefFactory, Duration duration, HeaderTranslator headerTranslator, @Nullable Consumer<MatchingValidationResult.Failure> consumer, PartialFunction<Signal<?>, Signal<?>> partialFunction) {
        this.actorRefFactory = (ActorRefFactory) ConditionChecker.checkNotNull(actorRefFactory, "actorRefFactory");
        this.maxTimeout = (Duration) ConditionChecker.checkNotNull(duration, "maxTimeout");
        this.headerTranslator = (HeaderTranslator) ConditionChecker.checkNotNull(headerTranslator, "headerTranslator");
        this.matchingValidationFailureConsumer = consumer;
        this.ackRequestSetter = partialFunction;
    }

    public static AcknowledgementAggregatorActorStarter of(ActorRefFactory actorRefFactory, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, @Nullable Consumer<MatchingValidationResult.Failure> consumer, AbstractCommandAckRequestSetter<?>... abstractCommandAckRequestSetterArr) {
        return of(actorRefFactory, acknowledgementConfig.getForwarderFallbackTimeout(), headerTranslator, consumer, abstractCommandAckRequestSetterArr);
    }

    public static AcknowledgementAggregatorActorStarter of(ActorRefFactory actorRefFactory, Duration duration, HeaderTranslator headerTranslator, @Nullable Consumer<MatchingValidationResult.Failure> consumer, AbstractCommandAckRequestSetter<?>... abstractCommandAckRequestSetterArr) {
        return new AcknowledgementAggregatorActorStarter(actorRefFactory, duration, headerTranslator, consumer, buildAckRequestSetter(abstractCommandAckRequestSetterArr));
    }

    public <T> T start(Command<?> command, Duration duration, Function<Object, T> function, BiFunction<Signal<?>, ActorRef, T> biFunction, Function<Signal<?>, T> function2) {
        return (T) preprocess(command, (signal, bool) -> {
            Optional entityId = SignalInformationPoint.getEntityId(command);
            if (!bool.booleanValue() || !entityId.isPresent()) {
                return function2.apply(signal);
            }
            EntityId entityId2 = (EntityId) entityId.get();
            Objects.requireNonNull(function);
            return doStart(entityId2, signal, duration, function::apply, actorRef -> {
                return biFunction.apply(signal, actorRef);
            });
        }, function);
    }

    public <T> T preprocess(Signal<?> signal, BiFunction<Signal<?>, Boolean, T> biFunction, Function<? super DittoHeaderInvalidException, T> function) {
        Signal signal2 = (Signal) this.ackRequestSetter.apply(signal);
        return (T) getDittoHeaderInvalidException(signal2).map(function).orElseGet(() -> {
            return biFunction.apply(signal2, Boolean.valueOf(shouldStartForIncoming(signal2)));
        });
    }

    public <T> T doStart(EntityId entityId, Signal<?> signal, @Nullable Duration duration, Consumer<Object> consumer, Function<ActorRef, T> function) {
        return function.apply(startAckAggregatorActor(entityId, signal, duration, consumer));
    }

    private ActorRef startAckAggregatorActor(EntityId entityId, Signal<?> signal, @Nullable Duration duration, Consumer<Object> consumer) {
        return this.actorRefFactory.actorOf(AcknowledgementAggregatorActor.props(entityId, signal, duration, this.maxTimeout, this.headerTranslator, consumer, this.matchingValidationFailureConsumer), getNextActorName(signal.getDittoHeaders()));
    }

    private String getNextActorName(DittoHeaders dittoHeaders) {
        String str = (String) dittoHeaders.getCorrelationId().map(str2 -> {
            return URLEncoder.encode(str2, StandardCharsets.UTF_8);
        }).orElse("_");
        int i = this.childCounter;
        this.childCounter = i + 1;
        return String.format("ackr%x-%s", Integer.valueOf(i), str);
    }

    private static PartialFunction<Signal<?>, Signal<?>> buildAckRequestSetter(AbstractCommandAckRequestSetter<?>... abstractCommandAckRequestSetterArr) {
        PFBuilder pFBuilder = new PFBuilder();
        for (AbstractCommandAckRequestSetter<?> abstractCommandAckRequestSetter : abstractCommandAckRequestSetterArr) {
            Class matchedClass = abstractCommandAckRequestSetter.getMatchedClass();
            Objects.requireNonNull(abstractCommandAckRequestSetter);
            pFBuilder = pFBuilder.match(matchedClass, abstractCommandAckRequestSetter::isApplicable, dittoHeadersSettable -> {
                return abstractCommandAckRequestSetter.apply(dittoHeadersSettable);
            });
        }
        return pFBuilder.matchAny(signal -> {
            return signal;
        }).build();
    }

    private static Optional<DittoHeaderInvalidException> getDittoHeaderInvalidException(Signal<?> signal) {
        Optional<DittoHeaderInvalidException> empty;
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        if (isTimeoutHeaderInvalid(dittoHeaders)) {
            String key = DittoHeaderDefinition.TIMEOUT.getKey();
            empty = Optional.of(DittoHeaderInvalidException.newBuilder().withInvalidHeaderKey(key).message(String.format("The value of the header '%s' must not be zero if response or acknowledgements are requested.", key)).description("Please provide a positive timeout.").dittoHeaders(dittoHeaders).build());
        } else {
            empty = Optional.empty();
        }
        return empty;
    }

    private static boolean isTimeoutHeaderInvalid(DittoHeaders dittoHeaders) {
        boolean z;
        if (!dittoHeaders.getTimeout().filter((v0) -> {
            return v0.isZero();
        }).isPresent()) {
            z = false;
        } else if (dittoHeaders.isResponseRequired()) {
            z = true;
        } else {
            z = !dittoHeaders.getAcknowledgementRequests().isEmpty();
        }
        return z;
    }

    private static boolean shouldStartForIncoming(Signal<?> signal) {
        boolean isChannelLive = SignalInformationPoint.isChannelLive(signal);
        boolean isChannelSmart = SignalInformationPoint.isChannelSmart(signal);
        Set acknowledgementRequests = signal.getDittoHeaders().getAcknowledgementRequests();
        return (!(signal instanceof ThingModifyCommand) || isChannelLive) ? (SignalInformationPoint.isMessageCommand(signal) || (isChannelLive && SignalInformationPoint.isThingCommand(signal))) ? acknowledgementRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersisted) : isChannelSmart : acknowledgementRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotLiveResponse);
    }
}
