package org.eclipse.ditto.services.utils.persistentactors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.RecoveryCompleted;
import akka.persistence.RecoveryTimedOut;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.services.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.services.utils.persistence.mongo.config.SnapshotConfig;
import org.eclipse.ditto.services.utils.persistentactors.AbstractPersistenceSupervisor;
import org.eclipse.ditto.services.utils.persistentactors.commands.CommandStrategy;
import org.eclipse.ditto.services.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.services.utils.persistentactors.results.Result;
import org.eclipse.ditto.services.utils.persistentactors.results.ResultVisitor;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.events.base.Event;
import scala.Option;

/* loaded from: input_file:org/eclipse/ditto/services/utils/persistentactors/AbstractShardedPersistenceActor.class */
public abstract class AbstractShardedPersistenceActor<C extends Command, S, I extends EntityId, K, E extends Event> extends AbstractPersistentActorWithTimersAndCleanup implements ResultVisitor<E> {
    private final SnapshotAdapter<S> snapshotAdapter;
    protected final I entityId;
    private long accessCounter = 0;

    @Nullable
    protected S entity = null;
    private long lastSnapshotRevision = 0;
    private long confirmedSnapshotRevision = 0;
    private final AbstractActor.Receive handleEvents = ReceiveBuilder.create().match(getEventClass(), event -> {
        this.entity = getEventStrategy().handle(event, this.entity, getRevisionNumber());
    }).build();
    private final AbstractActor.Receive handleCleanups = super.createReceive();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/eclipse/ditto/services/utils/persistentactors/AbstractShardedPersistenceActor$CheckForActivity.class */
    public static final class CheckForActivity {
        private final long accessCounter;

        private CheckForActivity(long j) {
            this.accessCounter = j;
        }

        public long getAccessCounter() {
            return this.accessCounter;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/utils/persistentactors/AbstractShardedPersistenceActor$Control.class */
    public enum Control {
        TAKE_SNAPSHOT
    }

    protected AbstractShardedPersistenceActor(I i, SnapshotAdapter<S> snapshotAdapter) {
        this.entityId = i;
        this.snapshotAdapter = snapshotAdapter;
    }

    public abstract String persistenceId();

    public abstract String journalPluginId();

    public abstract String snapshotPluginId();

    protected abstract Class<E> getEventClass();

    protected abstract CommandStrategy.Context<K> getStrategyContext();

    protected abstract CommandStrategy<C, S, K, Result<E>> getCreatedStrategy();

    protected abstract CommandStrategy<? extends C, S, K, Result<E>> getDeletedStrategy();

    protected abstract EventStrategy<E, S> getEventStrategy();

    protected abstract ActivityCheckConfig getActivityCheckConfig();

    protected abstract SnapshotConfig getSnapshotConfig();

    protected abstract boolean entityExistsAsDeleted();

    protected abstract DittoRuntimeExceptionBuilder newNotAccessibleExceptionBuilder();

    protected abstract void publishEvent(E e);

    protected abstract JsonSchemaVersion getEntitySchemaVersion(S s);

    protected void recoveryCompleted(RecoveryCompleted recoveryCompleted) {
        becomeCreatedOrDeletedHandler();
    }

    protected final void becomeCreatedOrDeletedHandler() {
        if (isEntityActive()) {
            becomeCreatedHandler();
        } else {
            becomeDeletedHandler();
        }
    }

    protected long getRevisionNumber() {
        return lastSequenceNr();
    }

    public void postStop() throws Exception {
        this.log.debug("Stopping PersistenceActor for entity with ID <{}>.", this.entityId);
        super.postStop();
    }

    @Override // org.eclipse.ditto.services.utils.persistentactors.AbstractPersistentActorWithTimersAndCleanup
    public AbstractActor.Receive createReceive() {
        return createDeletedBehavior();
    }

    public void onRecoveryFailure(Throwable th, Option<Object> option) {
        this.log.error(th, "Recovery Failure for entity with ID <{}>", this.entityId);
    }

    public AbstractActor.Receive createReceiveRecover() {
        return this.handleEvents.orElse(ReceiveBuilder.create().match(SnapshotOffer.class, snapshotOffer -> {
            this.log.debug("Got SnapshotOffer: {}", snapshotOffer);
            recoverFromSnapshotOffer(snapshotOffer);
        }).match(RecoveryTimedOut.class, recoveryTimedOut -> {
            this.log.warning("RecoveryTimeout occurred during recovery for entity with ID {}", this.entityId);
        }).match(RecoveryCompleted.class, this::recoveryCompleted).matchAny(obj -> {
            this.log.warning("Unknown recover message: {}", obj);
        }).build());
    }

    protected void becomeCreatedHandler() {
        CommandStrategy<C, S, K, Result<E>> createdStrategy = getCreatedStrategy();
        AbstractActor.Receive receive = this.handleCleanups;
        ReceiveBuilder create = ReceiveBuilder.create();
        Class<C> matchingClass = createdStrategy.getMatchingClass();
        Objects.requireNonNull(createdStrategy);
        getContext().become(receive.orElse(create.match(matchingClass, createdStrategy::isDefined, this::handleByCommandStrategy).match(CheckForActivity.class, this::checkForActivity).matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval).match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess).match(SaveSnapshotFailure.class, this::saveSnapshotFailure).matchAny(this::matchAnyAfterInitialization).build()));
        scheduleCheckForActivity(getActivityCheckConfig().getInactiveInterval());
        scheduleSnapshot();
    }

    protected void becomeDeletedHandler() {
        getContext().become(createDeletedBehavior());
        scheduleCheckForActivity(getActivityCheckConfig().getDeletedInterval());
        cancelSnapshot();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [org.eclipse.ditto.signals.events.base.Event] */
    protected void persistAndApplyEvent(E e, BiConsumer<E, S> biConsumer) {
        E e2 = null != this.entity ? (Event) e.setDittoHeaders(e.getDittoHeaders().toBuilder().schemaVersion(getEntitySchemaVersion(this.entity)).build()) : e;
        if (e2.getDittoHeaders().isDryRun()) {
            biConsumer.accept(e2, this.entity);
        } else {
            persistEvent(e2, event -> {
                applyEvent(event);
                biConsumer.accept(event, this.entity);
            });
        }
    }

    protected void checkForActivity(CheckForActivity checkForActivity) {
        if (entityExistsAsDeleted() && this.lastSnapshotRevision < getRevisionNumber()) {
            takeSnapshot("the entity is deleted and has no up-to-date snapshot");
            scheduleCheckForActivity(getActivityCheckConfig().getDeletedInterval());
        } else if (this.accessCounter > checkForActivity.accessCounter) {
            scheduleCheckForActivity(getActivityCheckConfig().getInactiveInterval());
        } else if (isEntityActive()) {
            shutdown("Entity <{}> was not accessed in a while. Shutting Actor down ...", this.entityId);
        } else {
            shutdown("Entity <{}> was deleted recently. Shutting Actor down ...", this.entityId);
        }
    }

    protected void passivate() {
        getContext().getParent().tell(AbstractPersistenceSupervisor.Control.PASSIVATE, getSelf());
    }

    private AbstractActor.Receive createDeletedBehavior() {
        return this.handleCleanups.orElse(handleByStrategyReceiveBuilder(getDeletedStrategy()).match(CheckForActivity.class, this::checkForActivity).matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval).match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess).match(SaveSnapshotFailure.class, this::saveSnapshotFailure).matchAny(this::notAccessible).build());
    }

    protected void scheduleCheckForActivity(Duration duration) {
        if (duration.isNegative() || duration.isZero()) {
            this.log.debug("Activity check is disabled: <{}>", duration);
        } else {
            this.log.debug("Scheduling for Activity Check in <{}> seconds.", duration);
            timers().startSingleTimer("activityCheck", new CheckForActivity(this.accessCounter), duration);
        }
    }

    private void scheduleSnapshot() {
        timers().startPeriodicTimer("takeSnapshot", Control.TAKE_SNAPSHOT, getSnapshotConfig().getInterval());
    }

    private void cancelSnapshot() {
        timers().cancel("takeSnapshot");
    }

    private void handleByCommandStrategy(C c) {
        handleByStrategy(c, getCreatedStrategy());
    }

    private <T extends Command> ReceiveBuilder handleByStrategyReceiveBuilder(CommandStrategy<T, S, K, Result<E>> commandStrategy) {
        return ReceiveBuilder.create().match(commandStrategy.getMatchingClass(), command -> {
            handleByStrategy(command, commandStrategy);
        });
    }

    private <T extends Command> void handleByStrategy(T t, CommandStrategy<T, S, K, Result<E>> commandStrategy) {
        this.log.debug("Handling by strategy: <{}>", t);
        this.accessCounter++;
        try {
            commandStrategy.apply(getStrategyContext(), this.entity, getNextRevisionNumber(), t).accept(this);
        } catch (DittoRuntimeException e) {
            getSender().tell(e, getSelf());
        }
    }

    @Override // org.eclipse.ditto.services.utils.persistentactors.results.ResultVisitor
    public void onMutation(Command command, E e, WithDittoHeaders withDittoHeaders, boolean z, boolean z2) {
        persistAndApplyEvent(e, (event, obj) -> {
            notifySender(withDittoHeaders);
            if (z2) {
                becomeDeletedHandler();
            }
            if (z) {
                becomeCreatedHandler();
            }
        });
    }

    @Override // org.eclipse.ditto.services.utils.persistentactors.results.ResultVisitor
    public void onQuery(Command command, WithDittoHeaders withDittoHeaders) {
        notifySender(withDittoHeaders);
    }

    @Override // org.eclipse.ditto.services.utils.persistentactors.results.ResultVisitor
    public void onError(DittoRuntimeException dittoRuntimeException) {
        notifySender(dittoRuntimeException);
    }

    private long getNextRevisionNumber() {
        return getRevisionNumber() + 1;
    }

    private void persistEvent(E e, Consumer<E> consumer) {
        LogUtil.enhanceLogWithCorrelationId(this.log, e, new LogUtil.MdcField[0]);
        this.log.debug("Persisting Event <{}>.", e.getType());
        persist(e, event -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, e.getDittoHeaders().getCorrelationId(), new LogUtil.MdcField[0]);
            this.log.info("Successfully persisted Event <{}>.", e.getType());
            consumer.accept(event);
            if (snapshotThresholdPassed()) {
                takeSnapshot("snapshot threshold is reached");
            }
        });
    }

    private void takeSnapshot(String str) {
        long revisionNumber = getRevisionNumber();
        if (this.entity != null && this.lastSnapshotRevision != revisionNumber) {
            this.log.debug("Taking snapshot for entity with ID <{}> and sequence number <{}> because {}.", this.entityId, Long.valueOf(revisionNumber), str);
            saveSnapshot(this.snapshotAdapter.toSnapshotStore(this.entity));
            this.lastSnapshotRevision = revisionNumber;
        } else if (this.lastSnapshotRevision == revisionNumber) {
            this.log.debug("Not taking duplicate snapshot for entity <{}> with revision <{}> even if {}.", this.entity, Long.valueOf(revisionNumber), str);
        } else {
            this.log.debug("Not taking snapshot for nonexistent entity <{}> even if {}.", this.entityId, str);
        }
    }

    private boolean snapshotThresholdPassed() {
        return getRevisionNumber() - this.lastSnapshotRevision >= getSnapshotConfig().getThreshold();
    }

    private void applyEvent(E e) {
        this.handleEvents.onMessage().apply(e);
        publishEvent(e);
    }

    private void notifySender(WithDittoHeaders withDittoHeaders) {
        notifySender(getSender(), withDittoHeaders);
    }

    private void notifySender(ActorRef actorRef, WithDittoHeaders withDittoHeaders) {
        this.accessCounter++;
        actorRef.tell(withDittoHeaders, getSelf());
    }

    private void takeSnapshotByInterval(Control control) {
        takeSnapshot("snapshot interval has passed");
    }

    private void saveSnapshotSuccess(SaveSnapshotSuccess saveSnapshotSuccess) {
        this.log.debug("Got {}", saveSnapshotSuccess);
        this.confirmedSnapshotRevision = saveSnapshotSuccess.metadata().sequenceNr();
    }

    private void saveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
        this.log.error(saveSnapshotFailure.cause(), "Got {}", saveSnapshotFailure);
    }

    private void recoverFromSnapshotOffer(SnapshotOffer snapshotOffer) {
        this.entity = (S) this.snapshotAdapter.fromSnapshotStore(snapshotOffer);
        long sequenceNr = snapshotOffer.metadata().sequenceNr();
        this.confirmedSnapshotRevision = sequenceNr;
        this.lastSnapshotRevision = sequenceNr;
    }

    @Override // org.eclipse.ditto.services.utils.persistentactors.AbstractPersistentActorWithTimersAndCleanup
    protected long getLatestSnapshotSequenceNumber() {
        return this.confirmedSnapshotRevision;
    }

    private void notAccessible(Object obj) {
        DittoRuntimeExceptionBuilder newNotAccessibleExceptionBuilder = newNotAccessibleExceptionBuilder();
        if (obj instanceof WithDittoHeaders) {
            newNotAccessibleExceptionBuilder.dittoHeaders(((WithDittoHeaders) obj).getDittoHeaders());
        }
        notifySender(newNotAccessibleExceptionBuilder.build());
    }

    private void shutdown(String str, I i) {
        this.log.debug(str, String.valueOf(i));
        passivate();
    }

    private boolean isEntityActive() {
        return (this.entity == null || entityExistsAsDeleted()) ? false : true;
    }

    protected void matchAnyAfterInitialization(Object obj) {
        this.log.warning("Unknown message: {}", obj);
    }

    public static Object checkForActivity(long j) {
        return new CheckForActivity(j);
    }
}
