package org.eclipse.ditto.services.thingsearch.persistence.write.streaming;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.pattern.Patterns;
import akka.stream.Attributes;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Optional;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.models.policies.commands.sudo.SudoRetrievePolicyRevision;
import org.eclipse.ditto.services.models.policies.commands.sudo.SudoRetrievePolicyRevisionResponse;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.Metadata;
import org.eclipse.ditto.services.utils.akka.controlflow.MergeSortedAsPair;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/persistence/write/streaming/BackgroundSyncStream.class */
public final class BackgroundSyncStream {
    private final ActorRef policiesShardRegion;
    private final Duration policiesAskTimeout;
    private final Duration toleranceWindow;
    private final int throttleThroughput;
    private final Duration throttlePeriod;

    private BackgroundSyncStream(ActorRef actorRef, Duration duration, Duration duration2, int i, Duration duration3) {
        this.policiesShardRegion = actorRef;
        this.policiesAskTimeout = duration;
        this.toleranceWindow = duration2;
        this.throttleThroughput = i;
        this.throttlePeriod = duration3;
    }

    public static BackgroundSyncStream of(ActorRef actorRef, Duration duration, Duration duration2, int i, Duration duration3) {
        return new BackgroundSyncStream(actorRef, duration, duration2, i, duration3);
    }

    public Source<Metadata, NotUsed> filterForInconsistencies(Source<Metadata, ?> source, Source<Metadata, ?> source2) {
        return MergeSortedAsPair.merge(dummyMetadata(), BackgroundSyncStream::compareMetadata, source, source2).throttle(this.throttleThroughput, this.throttlePeriod).flatMapConcat(this::filterForInconsistency).withAttributes(Attributes.logLevels(Attributes.logLevelWarning(), Attributes.logLevelDebug(), Attributes.logLevelError()));
    }

    private boolean isInsideToleranceWindow(Metadata metadata, Instant instant) {
        return ((Boolean) metadata.getModified().map(instant2 -> {
            return Boolean.valueOf(instant2.isAfter(instant));
        }).orElse(false)).booleanValue();
    }

    private static Metadata dummyMetadata() {
        return Metadata.of(ThingId.dummy(), 0L, PolicyId.dummy(), 0L);
    }

    private Source<Metadata, NotUsed> filterForInconsistency(Pair<Metadata, Metadata> pair) {
        Metadata metadata = (Metadata) pair.first();
        Metadata metadata2 = (Metadata) pair.second();
        int compareMetadata = compareMetadata(metadata, metadata2);
        Instant minus = Instant.now().minus((TemporalAmount) this.toleranceWindow);
        return compareMetadata < 0 ? isInsideToleranceWindow(metadata, minus) ? Source.empty() : Source.single(metadata).log("PersistedAndNotIndexed") : compareMetadata > 0 ? isInsideToleranceWindow(metadata2, minus) ? Source.empty() : Source.single(metadata2).log("NotPersistedAndIndexed") : metadata2.getThingId().isDummy() ? Source.failed(new IllegalStateException("Unexpected double-dummy entry: " + pair)) : isInsideToleranceWindow(metadata2, minus) ? Source.empty() : emitUnlessConsistent(metadata, metadata2);
    }

    private Source<Metadata, NotUsed> emitUnlessConsistent(Metadata metadata, Metadata metadata2) {
        if (metadata.getThingRevision() > metadata2.getThingRevision()) {
            return Source.single(metadata2).log("RevisionMismatch");
        }
        Optional<PolicyId> policyId = metadata.getPolicyId();
        return !policyId.equals(metadata2.getPolicyId()) ? Source.single(metadata2).log("PolicyIdMismatch") : policyId.isPresent() ? retrievePolicyRevisionAndEmitMismatch(policyId.get(), metadata2) : Source.empty();
    }

    private Source<Metadata, NotUsed> retrievePolicyRevisionAndEmitMismatch(PolicyId policyId, Metadata metadata) {
        return Source.fromSourceCompletionStage(Patterns.ask(this.policiesShardRegion, SudoRetrievePolicyRevision.of(policyId, DittoHeaders.empty()), this.policiesAskTimeout).handle((obj, th) -> {
            if (th != null) {
                return Source.single(th).log("ErrorRetrievingPolicyRevision " + policyId).map(th -> {
                    return metadata;
                });
            }
            if (!(obj instanceof SudoRetrievePolicyRevisionResponse)) {
                return Source.single(obj).log("UnexpectedPolicyResponse").map(obj -> {
                    return metadata;
                });
            }
            long revision = ((SudoRetrievePolicyRevisionResponse) obj).getRevision();
            return (Source) metadata.getPolicyRevision().filter(l -> {
                return l.equals(Long.valueOf(revision));
            }).map(l2 -> {
                return Source.empty();
            }).orElseGet(() -> {
                return Source.single(metadata).log("PolicyRevisionMismatch");
            });
        })).mapMaterializedValue(completionStage -> {
            return NotUsed.getInstance();
        });
    }

    private static int compareMetadata(Metadata metadata, Metadata metadata2) {
        return compareThingIds(metadata.getThingId(), metadata2.getThingId());
    }

    public static int compareThingIds(ThingId thingId, ThingId thingId2) {
        int compare = Boolean.compare(thingId.isDummy(), thingId2.isDummy());
        return compare != 0 ? compare : thingId.compareTo(thingId2);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 239348002:
                if (implMethodName.equals("filterForInconsistency")) {
                    z = 2;
                    break;
                }
                break;
            case 758469732:
                if (implMethodName.equals("lambda$retrievePolicyRevisionAndEmitMismatch$1cf7a66c$1")) {
                    z = true;
                    break;
                }
                break;
            case 1553592247:
                if (implMethodName.equals("lambda$retrievePolicyRevisionAndEmitMismatch$c1719b07$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1553592248:
                if (implMethodName.equals("lambda$retrievePolicyRevisionAndEmitMismatch$c1719b07$2")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/BackgroundSyncStream") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/Metadata;Ljava/lang/Object;)Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/Metadata;")) {
                    Metadata metadata = (Metadata) serializedLambda.getCapturedArg(0);
                    return obj -> {
                        return metadata;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/BackgroundSyncStream") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CompletionStage;)Lakka/NotUsed;")) {
                    return completionStage -> {
                        return NotUsed.getInstance();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/BackgroundSyncStream") && serializedLambda.getImplMethodSignature().equals("(Lakka/japi/Pair;)Lakka/stream/javadsl/Source;")) {
                    BackgroundSyncStream backgroundSyncStream = (BackgroundSyncStream) serializedLambda.getCapturedArg(0);
                    return backgroundSyncStream::filterForInconsistency;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/BackgroundSyncStream") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/Metadata;Ljava/lang/Throwable;)Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/Metadata;")) {
                    Metadata metadata2 = (Metadata) serializedLambda.getCapturedArg(0);
                    return th -> {
                        return metadata2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
