package org.eclipse.ditto.internal.utils.persistentactors.cleanup;

import akka.Done;
import akka.actor.AbstractFSM;
import akka.actor.ActorRef;
import akka.actor.FSM;
import akka.actor.FSM$StateTimeout$;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.japi.Pair;
import akka.japi.pf.FSMStateFunctionBuilder;
import akka.stream.Attributes;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.api.common.ModifyConfig;
import org.eclipse.ditto.base.api.common.RetrieveConfig;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.utils.akka.actors.ModifyConfigBehavior;
import org.eclipse.ditto.internal.utils.akka.actors.RetrieveConfigBehavior;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.health.RetrieveHealth;
import org.eclipse.ditto.internal.utils.health.RetrieveHealthResponse;
import org.eclipse.ditto.internal.utils.health.StatusDetailMessage;
import org.eclipse.ditto.internal.utils.health.StatusInfo;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.json.JsonObject;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.class */
public final class PersistenceCleanupActor extends AbstractFSM<State, String> implements RetrieveConfigBehavior, ModifyConfigBehavior {
    public static final String NAME = "persistenceCleanup";
    private static final String SET_LAST_PID = "last-pid";
    private static final Throwable KILL_SWITCH_EXCEPTION = new IllegalStateException();
    private final ThreadSafeDittoLoggingAdapter logger;
    private final Materializer materializer;
    private final Counter deleteEventsCounter;
    private final Counter deleteSnapsCounter;
    private final MongoReadJournal mongoReadJournal;
    private final Supplier<Pair<Integer, Integer>> responsibilitySupplier;
    private CleanupConfig config;
    private Cleanup cleanup;
    private Credits credits;

    @Nullable
    private UniqueKillSwitch killSwitch;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor$Control.class */
    public enum Control {
        STREAM_COMPLETE,
        STREAM_FAILED,
        SHUTDOWN
    }

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor$State.class */
    public enum State {
        IN_QUIET_PERIOD,
        RUNNING
    }

    PersistenceCleanupActor(Cleanup cleanup, Credits credits, MongoReadJournal mongoReadJournal, Supplier<Pair<Integer, Integer>> supplier) {
        this.logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
        this.materializer = Materializer.createMaterializer(getContext());
        this.deleteEventsCounter = DittoMetrics.counter("cleanup_delete_events");
        this.deleteSnapsCounter = DittoMetrics.counter("cleanup_delete_snapshots");
        this.killSwitch = null;
        this.config = CleanupConfig.of(ConfigFactory.empty());
        this.cleanup = cleanup;
        this.credits = credits;
        this.mongoReadJournal = mongoReadJournal;
        this.responsibilitySupplier = supplier;
    }

    private PersistenceCleanupActor(CleanupConfig cleanupConfig, MongoReadJournal mongoReadJournal, String str) {
        this.logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
        this.materializer = Materializer.createMaterializer(getContext());
        this.deleteEventsCounter = DittoMetrics.counter("cleanup_delete_events");
        this.deleteSnapsCounter = DittoMetrics.counter("cleanup_delete_snapshots");
        this.killSwitch = null;
        Cluster cluster = Cluster.get(getContext().getSystem());
        this.mongoReadJournal = mongoReadJournal;
        this.responsibilitySupplier = ClusterResponsibilitySupplier.of(cluster, str);
        this.config = cleanupConfig;
        this.cleanup = Cleanup.of(cleanupConfig, mongoReadJournal, this.materializer, this.responsibilitySupplier);
        this.credits = Credits.of(cleanupConfig);
    }

    public static Props props(CleanupConfig cleanupConfig, MongoReadJournal mongoReadJournal, String str) {
        return Props.create(PersistenceCleanupActor.class, new Object[]{cleanupConfig, mongoReadJournal, str});
    }

    public void preStart() throws Exception {
        super.preStart();
        if (this.config.isEnabled()) {
            startWith(State.IN_QUIET_PERIOD, "", randomizeQuietPeriod());
        } else {
            startWith(State.IN_QUIET_PERIOD, "");
        }
        when(State.IN_QUIET_PERIOD, inQuietPeriod());
        when(State.RUNNING, running());
        whenUnhandled(inAnyState());
        initialize();
    }

    private FSMStateFunctionBuilder<State, String> inQuietPeriod() {
        return matchEventEquals(StateTimeout(), this::startStream).eventEquals(Control.SHUTDOWN, this::shutdownInQuietPeriod);
    }

    private FSMStateFunctionBuilder<State, String> running() {
        return matchEvent(CleanupResult.class, this::logCleanupResult).eventEquals(Control.STREAM_COMPLETE, this::streamComplete).eventEquals(Control.STREAM_FAILED, this::streamFailed).eventEquals(Control.SHUTDOWN, this::shutdownRunningStream);
    }

    private FSMStateFunctionBuilder<State, String> inAnyState() {
        return matchEvent(RetrieveHealth.class, this::retrieveHealth).event(RetrieveConfig.class, (retrieveConfig, str) -> {
            retrieveConfigBehavior().onMessage().apply(retrieveConfig);
            return stay();
        }).event(ModifyConfig.class, (modifyConfig, str2) -> {
            modifyConfigBehavior().onMessage().apply(modifyConfig);
            Optional map = modifyConfig.getConfig().getValue(SET_LAST_PID).filter((v0) -> {
                return v0.isString();
            }).map((v0) -> {
                return v0.asString();
            });
            FSM.State stay = stay();
            Objects.requireNonNull(stay);
            return (FSM.State) map.map((v1) -> {
                return r1.using(v1);
            }).orElse(stay);
        }).anyEvent((obj, str3) -> {
            this.logger.warning("Got unhandled message <{}> when state=<{}> lastPid=<{}>", obj, ((State) stateName()).name(), str3);
            return stay();
        });
    }

    private FSM.State<State, String> startStream(FSM$StateTimeout$ fSM$StateTimeout$, String str) {
        this.logger.info("Quiet period expired, starting stream from <{}>", str);
        Pair pair = (Pair) this.credits.regulate(this.cleanup.getCleanupStream(str), this.logger).flatMapConcat(source -> {
            return source;
        }).viaMat(KillSwitches.single(), Keep.right()).toMat(Sink.foreach(this::notifySelf), Keep.both()).withAttributes(Attributes.inputBuffer(1, 1)).run(this.materializer);
        this.killSwitch = (UniqueKillSwitch) pair.first();
        ((CompletionStage) pair.second()).handle(this::streamCompletedOrFailed);
        return goTo(State.RUNNING);
    }

    private FSM.State<State, String> logCleanupResult(CleanupResult cleanupResult, String str) {
        this.logger.debug("CleanupResult=<{}>", cleanupResult);
        String str2 = cleanupResult.snapshotRevision.pid;
        if (!str.equals(str2)) {
            this.logger.info("Progress=<{}>", str2);
        }
        switch (cleanupResult.type) {
            case SNAPSHOTS:
                this.deleteSnapsCounter.increment(cleanupResult.result.getDeletedCount());
                break;
            case EVENTS:
            default:
                this.deleteEventsCounter.increment(cleanupResult.result.getDeletedCount());
                break;
        }
        return stay().using(str2);
    }

    private FSM.State<State, String> streamComplete(Control control, String str) {
        FSM.State<State, String> using = goTo(State.IN_QUIET_PERIOD).using("");
        if (!this.config.isEnabled()) {
            this.logger.info("Stream complete and disabled.");
            return using;
        }
        Duration randomizeQuietPeriod = randomizeQuietPeriod();
        this.logger.info("Stream complete. Next stream in <{}> from start", randomizeQuietPeriod);
        return using.forMax(randomizeQuietPeriod);
    }

    private FSM.State<State, String> streamFailed(Control control, String str) {
        FSM.State<State, String> using = goTo(State.IN_QUIET_PERIOD).using(str);
        if (!this.config.isEnabled()) {
            this.logger.info("Stream failed or shutdown and disabled. Last PID=<{}>", str);
            return using;
        }
        Duration randomizeQuietPeriod = randomizeQuietPeriod();
        this.logger.info("Stream failed or shutdown. Next stream in <{}> starting from <{}>", randomizeQuietPeriod, str);
        return using.forMax(randomizeQuietPeriod);
    }

    private FSM.State<State, String> shutdownRunningStream(Control control, String str) {
        this.logger.info("Activating kill-switch by demand: <{}>", this.killSwitch);
        if (this.killSwitch != null) {
            this.killSwitch.abort(KILL_SWITCH_EXCEPTION);
        }
        return stay();
    }

    private FSM.State<State, String> shutdownInQuietPeriod(Control control, String str) {
        if (this.config.isEnabled()) {
            this.logger.info("Starting stream from <{}> in <{}> on request", str, this.config.getQuietPeriod());
            return goTo(State.IN_QUIET_PERIOD).forMax(this.config.getQuietPeriod());
        }
        this.logger.info("Stream disabled. lastPid=<{}>", str);
        return goTo(State.IN_QUIET_PERIOD);
    }

    private FSM.State<State, String> retrieveHealth(RetrieveHealth retrieveHealth, String str) {
        getSender().tell(RetrieveHealthResponse.of(StatusInfo.fromDetail(StatusDetailMessage.of(StatusDetailMessage.Level.INFO, JsonObject.newBuilder().set("state", ((State) stateName()).name()).set("pid", str).build())), DittoHeaders.empty()), getSelf());
        return stay();
    }

    private Duration randomizeQuietPeriod() {
        long random = (long) (Math.random() * 1024.0d);
        Duration quietPeriod = this.config.getQuietPeriod();
        return quietPeriod.plus(quietPeriod.multipliedBy(random).dividedBy(1024L));
    }

    private void notifySelf(CleanupResult cleanupResult) {
        getSelf().tell(cleanupResult, ActorRef.noSender());
    }

    private Done streamCompletedOrFailed(@Nullable Done done, @Nullable Throwable th) {
        if (th == null) {
            getSelf().tell(Control.STREAM_COMPLETE, ActorRef.noSender());
        } else {
            if (th != KILL_SWITCH_EXCEPTION) {
                this.logger.error(th, "Stream failed");
            }
            getSelf().tell(Control.STREAM_FAILED, ActorRef.noSender());
        }
        return Done.getInstance();
    }

    public Config getConfig() {
        return this.config.render();
    }

    public Config setConfig(Config config) {
        this.config = this.config.setAll(config);
        this.cleanup = Cleanup.of(this.config, this.mongoReadJournal, this.materializer, this.responsibilitySupplier);
        this.credits = Credits.of(this.config);
        getSelf().tell(Control.SHUTDOWN, ActorRef.noSender());
        return this.config.render();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1164964299:
                if (implMethodName.equals("notifySelf")) {
                    z = false;
                    break;
                }
                break;
            case 768412130:
                if (implMethodName.equals("lambda$startStream$3fe1270b$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupResult;)V")) {
                    PersistenceCleanupActor persistenceCleanupActor = (PersistenceCleanupActor) serializedLambda.getCapturedArg(0);
                    return persistenceCleanupActor::notifySelf;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;)Lakka/stream/Graph;")) {
                    return source -> {
                        return source;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
