package org.eclipse.ditto.internal.utils.pubsub;

import akka.actor.ActorRef;
import akka.cluster.ddata.Replicator;
import akka.pattern.Patterns;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.internal.utils.ddata.DistributedDataConfig;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoveSubscriber;
import org.eclipse.ditto.internal.utils.pubsub.api.Request;
import org.eclipse.ditto.internal.utils.pubsub.api.SubAck;
import org.eclipse.ditto.internal.utils.pubsub.api.Subscribe;
import org.eclipse.ditto.internal.utils.pubsub.api.Unsubscribe;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/DistributedSubImpl.class */
public final class DistributedSubImpl implements DistributedSub {
    final ActorRef subSupervisor;
    private final DistributedDataConfig config;
    private final Replicator.WriteConsistency writeConsistency;
    private final long ddataDelayInMillis;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DistributedSubImpl(DistributedDataConfig distributedDataConfig, ActorRef actorRef) {
        this.config = distributedDataConfig;
        this.subSupervisor = actorRef;
        this.writeConsistency = distributedDataConfig.getSubscriptionWriteConsistency();
        this.ddataDelayInMillis = distributedDataConfig.getSubscriptionDelay().toMillis();
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.DistributedSub
    public CompletionStage<SubAck> subscribeWithFilterAndGroup(Collection<String> collection, ActorRef actorRef, @Nullable Predicate<Collection<String>> predicate, @Nullable String str) {
        if (str != null) {
            ConditionChecker.checkNotEmpty(str, "group");
        }
        CompletionStage<SubAck> askSubSupervisor = askSubSupervisor(Subscribe.of(collection, actorRef, this.writeConsistency, true, predicate, str));
        return this.ddataDelayInMillis <= 0 ? askSubSupervisor : askSubSupervisor.thenCompose(subAck -> {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeOnTimeout(subAck, this.ddataDelayInMillis, TimeUnit.MILLISECONDS);
            return completableFuture;
        });
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.DistributedSub
    public CompletionStage<SubAck> unsubscribeWithAck(Collection<String> collection, ActorRef actorRef) {
        return askSubSupervisor(Unsubscribe.of(collection, actorRef, this.writeConsistency, true));
    }

    private CompletionStage<SubAck> askSubSupervisor(Request request) {
        return Patterns.ask(this.subSupervisor, request, this.config.getWriteTimeout()).thenCompose(DistributedSubImpl::processAskResponse);
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.DistributedSub
    public void subscribeWithoutAck(Collection<String> collection, ActorRef actorRef) {
        this.subSupervisor.tell(Subscribe.of(collection, actorRef, Replicator.writeLocal(), false, null), actorRef);
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.DistributedSub
    public void unsubscribeWithoutAck(Collection<String> collection, ActorRef actorRef) {
        this.subSupervisor.tell(Unsubscribe.of(collection, actorRef, Replicator.writeLocal(), false), actorRef);
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.DistributedSub
    public void removeSubscriber(ActorRef actorRef) {
        this.subSupervisor.tell(RemoveSubscriber.of(actorRef, Replicator.writeLocal(), false), actorRef);
    }

    private static CompletionStage<SubAck> processAskResponse(Object obj) {
        return obj instanceof SubAck ? CompletableFuture.completedStage((SubAck) obj) : obj instanceof Throwable ? CompletableFuture.failedStage((Throwable) obj) : CompletableFuture.failedStage(new ClassCastException("Expect SubAck, got: " + obj));
    }
}
