package org.eclipse.ditto.internal.utils.persistence.mongo;

import akka.NotUsed;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.eclipse.ditto.internal.models.streaming.BatchedEntityIdWithRevisions;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.internal.models.streaming.SudoStreamPids;
import org.eclipse.ditto.internal.utils.akka.streaming.AbstractStreamingActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

@AllValuesAreNonnullByDefault
/* loaded from: input_file:org/eclipse/ditto/internal/utils/persistence/mongo/AbstractPersistenceStreamingActor.class */
public abstract class AbstractPersistenceStreamingActor<T extends EntityIdWithRevision<?>> extends AbstractStreamingActor<SudoStreamPids, T> {
    private final Function<PidWithSeqNr, T> entityMapper;
    private final Function<EntityIdWithRevision<?>, PidWithSeqNr> entityUnmapper;
    private final DittoMongoClient mongoClient;
    private final MongoReadJournal readJournal;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractPersistenceStreamingActor(Function<PidWithSeqNr, T> function, Function<EntityIdWithRevision<?>, PidWithSeqNr> function2) {
        this.entityMapper = (Function) Objects.requireNonNull(function);
        this.entityUnmapper = function2;
        Config config = getContext().getSystem().settings().config();
        this.mongoClient = MongoClientWrapper.newInstance(DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config)));
        this.readJournal = MongoReadJournal.newInstance(config, this.mongoClient, getContext().getSystem());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractPersistenceStreamingActor(Function<PidWithSeqNr, T> function, Function<EntityIdWithRevision<?>, PidWithSeqNr> function2, MongoReadJournal mongoReadJournal) {
        this.entityMapper = (Function) Objects.requireNonNull(function);
        this.entityUnmapper = function2;
        this.mongoClient = MongoClientWrapper.newInstance(DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())));
        this.readJournal = mongoReadJournal;
    }

    public void postStop() throws Exception {
        this.mongoClient.close();
        super.postStop();
    }

    protected abstract Class<T> getElementClass();

    protected final Class<SudoStreamPids> getCommandClass() {
        return SudoStreamPids.class;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getBurst(SudoStreamPids sudoStreamPids) {
        return sudoStreamPids.getBurst();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Duration getInitialTimeout(SudoStreamPids sudoStreamPids) {
        return Duration.ofMillis(sudoStreamPids.getTimeoutMillis());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Duration getIdleTimeout(SudoStreamPids sudoStreamPids) {
        return Duration.ofMillis(sudoStreamPids.getTimeoutMillis());
    }

    protected Object batchMessages(List<T> list) {
        return BatchedEntityIdWithRevisions.of(getElementClass(), list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Source<T, NotUsed> createSource(SudoStreamPids sudoStreamPids) {
        Source<String, NotUsed> journalPids;
        this.log.info("Starting stream for <{}>", sudoStreamPids);
        Duration ofMillis = Duration.ofMillis(sudoStreamPids.getTimeoutMillis());
        int burst = sudoStreamPids.getBurst() * 5;
        if (sudoStreamPids.hasNonEmptyLowerBound()) {
            journalPids = this.readJournal.getJournalPidsAbove(this.entityUnmapper.apply(sudoStreamPids.getLowerBound()).getPersistenceId(), burst, this.materializer);
        } else {
            journalPids = this.readJournal.getJournalPids(burst, ofMillis, this.materializer);
        }
        return journalPids.map(str -> {
            return mapEntity(new PidWithSeqNr(str, 0L));
        }).log("pid-streaming", this.log);
    }

    private T mapEntity(PidWithSeqNr pidWithSeqNr) {
        return this.entityMapper.apply(pidWithSeqNr);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 532024861:
                if (implMethodName.equals("lambda$createSource$29a9fb16$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/persistence/mongo/AbstractPersistenceStreamingActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lorg/eclipse/ditto/internal/models/streaming/EntityIdWithRevision;")) {
                    AbstractPersistenceStreamingActor abstractPersistenceStreamingActor = (AbstractPersistenceStreamingActor) serializedLambda.getCapturedArg(0);
                    return str -> {
                        return mapEntity(new PidWithSeqNr(str, 0L));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
