package org.eclipse.ditto.services.utils.persistence.mongo.namespace;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.namespaces.PurgeNamespace;
import org.eclipse.ditto.signals.commands.namespaces.PurgeNamespaceResponse;

/* loaded from: input_file:org/eclipse/ditto/services/utils/persistence/mongo/namespace/AbstractNamespaceOpsActor.class */
public abstract class AbstractNamespaceOpsActor<S> extends AbstractActor {
    protected final DiagnosticLoggingAdapter log;
    private final ActorRef pubSubMediator;
    private final NamespaceOps<S> namespaceOps;
    private final ActorMaterializer materializer;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractNamespaceOpsActor(ActorRef actorRef, NamespaceOps<S> namespaceOps) {
        this.log = LogUtil.obtain(this);
        this.pubSubMediator = actorRef;
        this.namespaceOps = namespaceOps;
        this.materializer = ActorMaterializer.create(getContext());
    }

    protected AbstractNamespaceOpsActor(NamespaceOps<S> namespaceOps) {
        this.log = LogUtil.obtain(this);
        this.pubSubMediator = DistributedPubSub.get(getContext().system()).mediator();
        this.namespaceOps = namespaceOps;
        this.materializer = ActorMaterializer.create(getContext());
    }

    protected abstract Collection<S> selectNamespace(String str);

    public void preStart() {
        subscribeForNamespaceCommands();
    }

    public void postStop() throws Exception {
        if (this.namespaceOps instanceof Closeable) {
            ((Closeable) this.namespaceOps).close();
        }
        super.postStop();
    }

    private void subscribeForNamespaceCommands() {
        ActorRef self = getSelf();
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe("namespaces.commands:purgeNamespace", self), self);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(PurgeNamespace.class, this::purgeNamespace).match(DistributedPubSubMediator.SubscribeAck.class, this::ignoreSubscribeAck).matchAny(obj -> {
            this.log.warning("unhandled: <{}>", obj);
        }).build();
    }

    private void purgeNamespace(PurgeNamespace purgeNamespace) {
        LogUtil.enhanceLogWithCorrelationId(this.log, purgeNamespace);
        Collection<S> selectNamespace = selectNamespace(purgeNamespace.getNamespace());
        this.log.info("Running <{}>. Affected collections: <{}>.", purgeNamespace, selectNamespace);
        ActorRef sender = getSender();
        ((CompletionStage) this.namespaceOps.purgeAll(selectNamespace).runWith(Sink.head(), this.materializer)).thenAccept(list -> {
            PurgeNamespaceResponse failed;
            if (list.isEmpty()) {
                failed = PurgeNamespaceResponse.successful(purgeNamespace.getNamespace(), getResourceType(), purgeNamespace.getDittoHeaders());
            } else {
                LogUtil.enhanceLogWithCorrelationId(this.log, purgeNamespace);
                String namespace = purgeNamespace.getNamespace();
                list.forEach(th -> {
                    this.log.error(th, "Error purging namespace <{}>", namespace);
                });
                failed = PurgeNamespaceResponse.failed(namespace, getResourceType(), purgeNamespace.getDittoHeaders());
            }
            sender.tell(failed, getSelf());
        }).exceptionally(th -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, purgeNamespace);
            this.log.error(th, "Failed to purge namespace <{}>!", purgeNamespace.getNamespace());
            return null;
        });
    }

    protected abstract String getResourceType();

    private void ignoreSubscribeAck(DistributedPubSubMediator.SubscribeAck subscribeAck) {
    }
}
