package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import java.lang.invoke.SerializedLambda;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.UpdateThing;
import org.eclipse.ditto.services.thingsearch.common.config.BackgroundSyncConfig;
import org.eclipse.ditto.services.thingsearch.common.config.DefaultBackgroundSyncConfig;
import org.eclipse.ditto.services.thingsearch.persistence.read.ThingsSearchPersistence;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.Metadata;
import org.eclipse.ditto.services.thingsearch.persistence.write.streaming.BackgroundSyncStream;
import org.eclipse.ditto.services.utils.akka.controlflow.ResumeSource;
import org.eclipse.ditto.services.utils.akka.streaming.TimestampPersistence;
import org.eclipse.ditto.services.utils.health.AbstractBackgroundStreamingActorWithConfigWithStatusReport;
import org.eclipse.ditto.services.utils.health.StatusDetailMessage;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor.class */
public final class BackgroundSyncActor extends AbstractBackgroundStreamingActorWithConfigWithStatusReport<BackgroundSyncConfig> {
    public static final String ACTOR_NAME = "backgroundSync";
    private final ThingsMetadataSource thingsMetadataSource;
    private final ThingsSearchPersistence thingsSearchPersistence;
    private final TimestampPersistence backgroundSyncPersistence;
    private final BackgroundSyncStream backgroundSyncStream;
    private final ActorRef thingsUpdater;
    private ThingId progressPersisted;
    private ThingId progressIndexed;

    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor$Control.class */
    private enum Control {
        BOOKMARK_THING_ID
    }

    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor$ProgressReport.class */
    private static final class ProgressReport {
        private final ThingId thingId;
        private final boolean persisted;

        private ProgressReport(ThingId thingId, boolean z) {
            this.thingId = thingId;
            this.persisted = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor$SyncEvent.class */
    public static final class SyncEvent implements AbstractBackgroundStreamingActorWithConfigWithStatusReport.Event {
        private final String description;
        private final ThingId thingId;
        private final long thingRevision;
        private final StatusDetailMessage.Level level;

        private SyncEvent(String str, ThingId thingId, long j, StatusDetailMessage.Level level) {
            this.description = str;
            this.thingId = thingId;
            this.thingRevision = j;
            this.level = level;
        }

        private static AbstractBackgroundStreamingActorWithConfigWithStatusReport.Event inconsistency(Metadata metadata) {
            return new SyncEvent("Inconsistent: " + metadata, metadata.getThingId(), metadata.getThingRevision(), StatusDetailMessage.Level.DEFAULT);
        }

        private static AbstractBackgroundStreamingActorWithConfigWithStatusReport.Event inconsistencyAgain(Metadata metadata) {
            return new SyncEvent("Inconsistent again: " + metadata, metadata.getThingId(), metadata.getThingRevision(), StatusDetailMessage.Level.WARN);
        }

        public String name() {
            return this.description;
        }

        public String toString() {
            return this.description;
        }

        public StatusDetailMessage.Level level() {
            return this.level;
        }
    }

    private BackgroundSyncActor(BackgroundSyncConfig backgroundSyncConfig, ThingsMetadataSource thingsMetadataSource, ThingsSearchPersistence thingsSearchPersistence, TimestampPersistence timestampPersistence, BackgroundSyncStream backgroundSyncStream, ActorRef actorRef) {
        super(backgroundSyncConfig);
        this.progressPersisted = ThingId.dummy();
        this.progressIndexed = ThingId.dummy();
        this.thingsMetadataSource = thingsMetadataSource;
        this.thingsSearchPersistence = thingsSearchPersistence;
        this.backgroundSyncPersistence = timestampPersistence;
        this.backgroundSyncStream = backgroundSyncStream;
        this.thingsUpdater = actorRef;
        getTimers().startPeriodicTimer(Control.BOOKMARK_THING_ID, Control.BOOKMARK_THING_ID, this.config.getQuietPeriod());
    }

    public static Props props(BackgroundSyncConfig backgroundSyncConfig, ActorRef actorRef, ThingsSearchPersistence thingsSearchPersistence, TimestampPersistence timestampPersistence, ActorRef actorRef2, ActorRef actorRef3) {
        return Props.create(BackgroundSyncActor.class, new Object[]{backgroundSyncConfig, ThingsMetadataSource.of(actorRef, backgroundSyncConfig.getThrottleThroughput(), backgroundSyncConfig.getIdleTimeout()), thingsSearchPersistence, timestampPersistence, BackgroundSyncStream.of(actorRef2, backgroundSyncConfig.getPolicyAskTimeout(), backgroundSyncConfig.getToleranceWindow(), backgroundSyncConfig.getThrottleThroughput(), backgroundSyncConfig.getThrottlePeriod()), actorRef3});
    }

    protected void preEnhanceSleepingBehavior(ReceiveBuilder receiveBuilder) {
        receiveBuilder.matchEquals(Control.BOOKMARK_THING_ID, control -> {
            this.log.debug("Ignoring: <{}>", control);
        }).match(ThingId.class, thingId -> {
            this.log.debug("Ignoring: <{}>", thingId);
        });
    }

    protected void preEnhanceStreamingBehavior(ReceiveBuilder receiveBuilder) {
        receiveBuilder.match(ProgressReport.class, this::setProgress).matchEquals(Control.BOOKMARK_THING_ID, this::bookmarkThingId);
    }

    protected void postEnhanceStatusReport(JsonObjectBuilder jsonObjectBuilder) {
        jsonObjectBuilder.set("progressPersisted", this.progressPersisted.toString());
        jsonObjectBuilder.set("progressIndexed", this.progressIndexed.toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseConfig, reason: merged with bridge method [inline-methods] */
    public BackgroundSyncConfig m0parseConfig(Config config) {
        return DefaultBackgroundSyncConfig.parse(config);
    }

    protected void streamTerminated(AbstractBackgroundStreamingActorWithConfigWithStatusReport.Event event) {
        super.streamTerminated(event);
        this.progressPersisted = ThingId.dummy();
        this.progressIndexed = ThingId.dummy();
        doBookmarkThingId("");
    }

    protected Source<?, ?> getSource() {
        return getLowerBoundSource().flatMapConcat(this::streamMetadataFromLowerBound).wireTap(this::handleInconsistency);
    }

    private Source<Metadata, NotUsed> streamMetadataFromLowerBound(ThingId thingId) {
        return this.backgroundSyncStream.filterForInconsistencies(getPersistedMetadataSourceWithProgressReporting(thingId), getIndexedMetadataSource(thingId));
    }

    private void setProgress(ProgressReport progressReport) {
        if (progressReport.persisted) {
            this.progressPersisted = progressReport.thingId;
        } else {
            this.progressIndexed = progressReport.thingId;
        }
    }

    private void bookmarkThingId(Control control) {
        ThingId thingId = BackgroundSyncStream.compareThingIds(this.progressIndexed, this.progressPersisted) <= 0 ? this.progressIndexed : this.progressPersisted;
        if (thingId.isDummy()) {
            return;
        }
        doBookmarkThingId(thingId.toString());
    }

    private void doBookmarkThingId(String str) {
        this.backgroundSyncPersistence.setTaggedTimestamp(Instant.now(), str).runWith(Sink.ignore(), this.materializer);
    }

    private void handleInconsistency(Metadata metadata) {
        this.thingsUpdater.tell(UpdateThing.of(metadata.getThingId(), DittoHeaders.empty()), ActorRef.noSender());
        if (isInconsistentAgain(metadata)) {
            getSelf().tell(SyncEvent.inconsistencyAgain(metadata), ActorRef.noSender());
        } else {
            getSelf().tell(SyncEvent.inconsistency(metadata), ActorRef.noSender());
        }
    }

    private boolean isInconsistentAgain(Metadata metadata) {
        return getEventStream().map((v0) -> {
            return v0.second();
        }).filter(event -> {
            return SyncEvent.class.isAssignableFrom(event.getClass());
        }).map(event2 -> {
            return (SyncEvent) event2;
        }).anyMatch(syncEvent -> {
            return metadata.getThingId().equals(syncEvent.thingId) && metadata.getThingRevision() == syncEvent.thingRevision;
        });
    }

    private Source<ThingId, NotUsed> getLowerBoundSource() {
        return this.backgroundSyncPersistence.getTaggedTimestamp().map(optional -> {
            String str;
            return (!optional.isPresent() || (str = (String) ((Pair) optional.get()).second()) == null || str.isEmpty()) ? ThingId.dummy() : ThingId.of(str);
        });
    }

    private Source<Metadata, NotUsed> getPersistedMetadataSourceWithProgressReporting(ThingId thingId) {
        ThingsMetadataSource thingsMetadataSource = this.thingsMetadataSource;
        Objects.requireNonNull(thingsMetadataSource);
        return wrapAsResumeSource(thingId, thingsMetadataSource::createSource).wireTap(metadata -> {
            getSelf().tell(new ProgressReport(metadata.getThingId(), true), ActorRef.noSender());
        });
    }

    private Source<Metadata, NotUsed> getIndexedMetadataSource(ThingId thingId) {
        ThingsSearchPersistence thingsSearchPersistence = this.thingsSearchPersistence;
        Objects.requireNonNull(thingsSearchPersistence);
        return wrapAsResumeSource(thingId, (v1) -> {
            return r2.sudoStreamMetadata(v1);
        }).wireTap(metadata -> {
            getSelf().tell(new ProgressReport(metadata.getThingId(), false), ActorRef.noSender());
        });
    }

    private Source<Metadata, NotUsed> wrapAsResumeSource(ThingId thingId, Function<ThingId, Source<Metadata, ?>> function) {
        return ResumeSource.onFailureWithBackoff(this.config.getMinBackoff(), this.config.getMaxBackoff(), this.config.getMaxRestarts(), this.config.getRecovery(), thingId, function, 1, list -> {
            return nextLowerBound(thingId, list);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ThingId nextLowerBound(ThingId thingId, List<Metadata> list) {
        return list.isEmpty() ? thingId : list.get(list.size() - 1).getThingId();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2108712309:
                if (implMethodName.equals("handleInconsistency")) {
                    z = 2;
                    break;
                }
                break;
            case -1812552970:
                if (implMethodName.equals("streamMetadataFromLowerBound")) {
                    z = false;
                    break;
                }
                break;
            case 607984087:
                if (implMethodName.equals("lambda$getPersistedMetadataSourceWithProgressReporting$9a2955f4$1")) {
                    z = 3;
                    break;
                }
                break;
            case 741236832:
                if (implMethodName.equals("lambda$getLowerBoundSource$129259bd$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1418469996:
                if (implMethodName.equals("lambda$getIndexedMetadataSource$9a2955f4$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/model/things/ThingId;)Lakka/stream/javadsl/Source;")) {
                    BackgroundSyncActor backgroundSyncActor = (BackgroundSyncActor) serializedLambda.getCapturedArg(0);
                    return backgroundSyncActor::streamMetadataFromLowerBound;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/Metadata;)V")) {
                    BackgroundSyncActor backgroundSyncActor2 = (BackgroundSyncActor) serializedLambda.getCapturedArg(0);
                    return metadata -> {
                        getSelf().tell(new ProgressReport(metadata.getThingId(), false), ActorRef.noSender());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/Metadata;)V")) {
                    BackgroundSyncActor backgroundSyncActor3 = (BackgroundSyncActor) serializedLambda.getCapturedArg(0);
                    return backgroundSyncActor3::handleInconsistency;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/Metadata;)V")) {
                    BackgroundSyncActor backgroundSyncActor4 = (BackgroundSyncActor) serializedLambda.getCapturedArg(0);
                    return metadata2 -> {
                        getSelf().tell(new ProgressReport(metadata2.getThingId(), true), ActorRef.noSender());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/updater/actors/BackgroundSyncActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Optional;)Lorg/eclipse/ditto/model/things/ThingId;")) {
                    return optional -> {
                        String str;
                        return (!optional.isPresent() || (str = (String) ((Pair) optional.get()).second()) == null || str.isEmpty()) ? ThingId.dummy() : ThingId.of(str);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
