package org.eclipse.ditto.internal.utils.pubsub;

import akka.actor.ActorContext;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import java.util.Collection;
import java.util.Collections;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.internal.utils.pubsub.AbstractPubSubFactory;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
import org.eclipse.ditto.things.model.ThingId;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/LiveSignalPubSubFactory.class */
public final class LiveSignalPubSubFactory extends AbstractPubSubFactory<SignalWithEntityId<?>> {
    private static final AckExtractor<SignalWithEntityId<?>> ACK_EXTRACTOR = AckExtractor.of(LiveSignalPubSubFactory::getThingId, (v0) -> {
        return v0.getDittoHeaders();
    });
    private static final AbstractPubSubFactory.DDataProvider PROVIDER = AbstractPubSubFactory.DDataProvider.of("live-signal-aware");

    private LiveSignalPubSubFactory(ActorRefFactory actorRefFactory, ActorSystem actorSystem, PubSubTopicExtractor<SignalWithEntityId<?>> pubSubTopicExtractor, DistributedAcks distributedAcks) {
        super(actorRefFactory, actorSystem, Signal.class, pubSubTopicExtractor, PROVIDER, ACK_EXTRACTOR, distributedAcks);
    }

    public static LiveSignalPubSubFactory of(ActorContext actorContext, DistributedAcks distributedAcks) {
        return new LiveSignalPubSubFactory(actorContext, actorContext.system(), topicExtractor(), distributedAcks);
    }

    public static LiveSignalPubSubFactory of(ActorSystem actorSystem, DistributedAcks distributedAcks) {
        return new LiveSignalPubSubFactory(actorSystem, actorSystem, topicExtractor(), distributedAcks);
    }

    private static Collection<String> getStreamingTypeTopic(Signal<?> signal) {
        return (Collection) StreamingType.fromSignal(signal).map((v0) -> {
            return v0.getDistributedPubSubTopic();
        }).map((v0) -> {
            return Collections.singleton(v0);
        }).orElse(Collections.emptySet());
    }

    private static PubSubTopicExtractor<SignalWithEntityId<?>> topicExtractor() {
        return ReadSubjectExtractor.of().with((v0) -> {
            return getStreamingTypeTopic(v0);
        });
    }

    private static ThingId getThingId(SignalWithEntityId<?> signalWithEntityId) {
        return ThingId.of(signalWithEntityId.getEntityId());
    }
}
