package org.eclipse.ditto.internal.utils.pubsubthings;

import akka.actor.AbstractExtensionId;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelInvalidException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.internal.utils.pubsub.DistributedSub;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsub.api.SubAck;
import org.eclipse.ditto.internal.utils.pubsubpolicies.PolicyAnnouncementPubSubFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsubthings/DittoProtocolSubImpl.class */
public final class DittoProtocolSubImpl implements DittoProtocolSub {
    private final DistributedSub liveSignalSub;
    private final DistributedSub twinEventSub;
    private final DistributedSub policyAnnouncementSub;
    private final DistributedAcks distributedAcks;

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsubthings/DittoProtocolSubImpl$ExtensionId.class */
    static final class ExtensionId extends AbstractExtensionId<DittoProtocolSub> {
        static final ExtensionId INSTANCE = new ExtensionId();

        private ExtensionId() {
        }

        /* renamed from: createExtension, reason: merged with bridge method [inline-methods] */
        public DittoProtocolSub m1createExtension(ExtendedActorSystem extendedActorSystem) {
            return DittoProtocolSubImpl.of(extendedActorSystem, DistributedAcks.create(extendedActorSystem));
        }
    }

    private DittoProtocolSubImpl(DistributedSub distributedSub, DistributedSub distributedSub2, DistributedSub distributedSub3, DistributedAcks distributedAcks) {
        this.liveSignalSub = distributedSub;
        this.twinEventSub = distributedSub2;
        this.policyAnnouncementSub = distributedSub3;
        this.distributedAcks = distributedAcks;
    }

    static DittoProtocolSubImpl of(ActorSystem actorSystem, DistributedAcks distributedAcks) {
        return new DittoProtocolSubImpl(LiveSignalPubSubFactory.of(actorSystem, distributedAcks).startDistributedSub(), ThingEventPubSubFactory.readSubjectsOnly(actorSystem, distributedAcks).startDistributedSub(), PolicyAnnouncementPubSubFactory.of(actorSystem, actorSystem).startDistributedSub(), distributedAcks);
    }

    @Override // org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub
    public CompletionStage<Boolean> subscribe(Collection<StreamingType> collection, Collection<String> collection2, ActorRef actorRef, @Nullable String str, boolean z) {
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        return partitionByStreamingTypes(collection, set -> {
            return !set.isEmpty() ? this.liveSignalSub.subscribeWithFilterAndGroup(collection2, actorRef, toFilter(set), str, z) : completedFuture;
        }, bool -> {
            return bool.booleanValue() ? this.twinEventSub.subscribeWithFilterAndGroup(collection2, actorRef, (Predicate) null, str, z) : completedFuture;
        }, bool2 -> {
            return bool2.booleanValue() ? this.policyAnnouncementSub.subscribeWithFilterAndGroup(collection2, actorRef, (Predicate) null, str, z) : completedFuture;
        });
    }

    @Override // org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub
    public void removeSubscriber(ActorRef actorRef) {
        this.liveSignalSub.removeSubscriber(actorRef);
        this.twinEventSub.removeSubscriber(actorRef);
        this.policyAnnouncementSub.removeSubscriber(actorRef);
        this.distributedAcks.removeSubscriber(actorRef);
    }

    @Override // org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub
    public CompletionStage<Void> updateLiveSubscriptions(Collection<StreamingType> collection, Collection<String> collection2, ActorRef actorRef) {
        return partitionByStreamingTypes(collection, set -> {
            return !set.isEmpty() ? this.liveSignalSub.subscribeWithFilterAndGroup(collection2, actorRef, toFilter(set), (String) null, false) : this.liveSignalSub.unsubscribeWithAck(collection2, actorRef);
        }, bool -> {
            return CompletableFuture.completedStage(null);
        }, bool2 -> {
            return CompletableFuture.completedStage(null);
        }).thenApply(bool3 -> {
            return null;
        });
    }

    @Override // org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub
    public CompletionStage<Void> removeTwinSubscriber(ActorRef actorRef, Collection<String> collection) {
        return this.twinEventSub.unsubscribeWithAck(collection, actorRef).thenApply(subAck -> {
            return null;
        });
    }

    @Override // org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub
    public CompletionStage<Void> removePolicyAnnouncementSubscriber(ActorRef actorRef, Collection<String> collection) {
        return this.policyAnnouncementSub.unsubscribeWithAck(collection, actorRef).thenApply(subAck -> {
            return null;
        });
    }

    @Override // org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub
    public CompletionStage<Void> declareAcknowledgementLabels(Collection<AcknowledgementLabel> collection, ActorRef actorRef, @Nullable String str) {
        if (collection.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        try {
            ensureAcknowledgementLabelsAreFullyResolved(collection);
            return this.distributedAcks.declareAcknowledgementLabels(collection, actorRef, str).thenApply(acksDeclared -> {
                return null;
            });
        } catch (DittoRuntimeException e) {
            return CompletableFuture.failedStage(e);
        }
    }

    private static void ensureAcknowledgementLabelsAreFullyResolved(Collection<AcknowledgementLabel> collection) {
        collection.stream().filter(Predicate.not((v0) -> {
            return v0.isFullyResolved();
        })).findFirst().ifPresent(acknowledgementLabel -> {
            throw AcknowledgementLabelInvalidException.of(acknowledgementLabel, "AcknowledgementLabel was not fully resolved while trying to declare it", (URI) null, DittoHeaders.empty());
        });
    }

    @Override // org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub
    public void removeAcknowledgementLabelDeclaration(ActorRef actorRef) {
        this.distributedAcks.removeAcknowledgementLabelDeclaration(actorRef);
    }

    private CompletionStage<Boolean> partitionByStreamingTypes(Collection<StreamingType> collection, Function<Set<StreamingType>, CompletionStage<SubAck>> function, Function<Boolean, CompletionStage<SubAck>> function2, Function<Boolean, CompletionStage<SubAck>> function3) {
        Set<StreamingType> copyOf;
        boolean remove;
        boolean remove2;
        if (collection.isEmpty()) {
            copyOf = Collections.emptySet();
            remove = false;
            remove2 = false;
        } else {
            copyOf = EnumSet.copyOf((Collection) collection);
            remove = copyOf.remove(StreamingType.EVENTS);
            remove2 = copyOf.remove(StreamingType.POLICY_ANNOUNCEMENTS);
        }
        CompletionStage<U> thenApply = function.apply(copyOf).thenApply(this::isConsistent);
        CompletionStage<U> thenApply2 = function2.apply(Boolean.valueOf(remove)).thenApply(this::isConsistent);
        CompletionStage<U> thenApply3 = function3.apply(Boolean.valueOf(remove2)).thenApply(this::isConsistent);
        return thenApply.thenCompose(bool -> {
            return thenApply2.thenCompose(bool -> {
                return thenApply3.thenApply(bool -> {
                    return Boolean.valueOf(bool.booleanValue() && bool.booleanValue() && bool.booleanValue());
                });
            });
        });
    }

    private boolean isConsistent(@Nullable SubAck subAck) {
        return subAck == null || subAck.isConsistent();
    }

    private static Predicate<Collection<String>> toFilter(Collection<StreamingType> collection) {
        Set set = (Set) collection.stream().map((v0) -> {
            return v0.getDistributedPubSubTopic();
        }).collect(Collectors.toSet());
        return collection2 -> {
            Stream stream = collection2.stream();
            Objects.requireNonNull(set);
            return stream.anyMatch((v1) -> {
                return r1.contains(v1);
            });
        };
    }
}
