package org.eclipse.ditto.services.thingsearch.persistence.write.streaming;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.ControlMessage;
import akka.dispatch.RequiresMessageQueue;
import akka.dispatch.UnboundedControlAwareMessageQueueSemantics;
import akka.japi.function.Function;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.Attributes;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.Metadata;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/persistence/write/streaming/ChangeQueueActor.class */
public final class ChangeQueueActor extends AbstractActor implements RequiresMessageQueue<UnboundedControlAwareMessageQueueSemantics> {
    public static final String ACTOR_NAME = "changeQueueActor";
    private static final Duration ASK_SELF_TIMEOUT = Duration.ofSeconds(5);
    private Map<ThingId, Metadata> cache = new HashMap();
    private Map<ThingId, Metadata> cacheShouldAcknowledge = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/persistence/write/streaming/ChangeQueueActor$Control.class */
    public enum Control implements ControlMessage {
        DUMP,
        DUMP_SHOULD_ACKNOWLEDGE
    }

    private ChangeQueueActor() {
    }

    public static Props props() {
        return Props.create(ChangeQueueActor.class, new Object[0]).withMailbox("akka.actor.mailbox.unbounded-control-aware-queue-based");
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Metadata.class, this::enqueue).match(Control.class, this::dump).build();
    }

    private void enqueue(Metadata metadata) {
        if (metadata.getSenders().isEmpty()) {
            ConsistencyLag.startS1InChangeQueue(metadata);
            this.cache.merge(metadata.getThingId(), metadata, (v0, v1) -> {
                return v0.prependTimersAndSenders(v1);
            });
        } else {
            ConsistencyLag.startS1InChangeQueue(metadata);
            this.cacheShouldAcknowledge.merge(metadata.getThingId(), metadata, (v0, v1) -> {
                return v0.prependTimersAndSenders(v1);
            });
        }
    }

    public static Source<Map<ThingId, Metadata>, NotUsed> createSource(ActorRef actorRef, boolean z, Duration duration) {
        Source repeat;
        if (duration.isNegative() || duration.isZero()) {
            repeat = Source.repeat(z ? Control.DUMP_SHOULD_ACKNOWLEDGE : Control.DUMP);
        } else {
            repeat = Source.repeat(z ? Control.DUMP_SHOULD_ACKNOWLEDGE : Control.DUMP).throttle(1, duration);
        }
        return repeat.flatMapConcat(askSelf(actorRef)).filter(map -> {
            return !map.isEmpty();
        });
    }

    private void dump(Control control) {
        if (control == Control.DUMP) {
            this.cache.values().forEach(ConsistencyLag::startS2WaitForDemand);
            getSender().tell(this.cache, getSelf());
            this.cache = new HashMap();
        } else {
            if (control != Control.DUMP_SHOULD_ACKNOWLEDGE) {
                throw new IllegalArgumentException("Unsupported control dump message: " + control);
            }
            this.cacheShouldAcknowledge.values().forEach(ConsistencyLag::startS2WaitForDemand);
            getSender().tell(this.cacheShouldAcknowledge, getSelf());
            this.cacheShouldAcknowledge = new HashMap();
        }
    }

    private static Function<Control, Source<Map<ThingId, Metadata>, NotUsed>> askSelf(ActorRef actorRef) {
        return control -> {
            return Source.completionStageSource(Patterns.ask(actorRef, control, ASK_SELF_TIMEOUT).handle((obj, th) -> {
                return obj instanceof Map ? Source.single((Map) obj) : Source.empty();
            })).withAttributes(Attributes.inputBuffer(1, 1)).mapMaterializedValue(completionStage -> {
                return NotUsed.getInstance();
            });
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -801438428:
                if (implMethodName.equals("lambda$askSelf$7a08e11$1")) {
                    z = 2;
                    break;
                }
                break;
            case -582223733:
                if (implMethodName.equals("lambda$createSource$cf7f5661$1")) {
                    z = false;
                    break;
                }
                break;
            case 1490819318:
                if (implMethodName.equals("lambda$askSelf$cc82b0af$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/ChangeQueueActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;)Z")) {
                    return map -> {
                        return !map.isEmpty();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/ChangeQueueActor") && serializedLambda.getImplMethodSignature().equals("(Lakka/actor/ActorRef;Lorg/eclipse/ditto/services/thingsearch/persistence/write/streaming/ChangeQueueActor$Control;)Lakka/stream/javadsl/Source;")) {
                    ActorRef actorRef = (ActorRef) serializedLambda.getCapturedArg(0);
                    return control -> {
                        return Source.completionStageSource(Patterns.ask(actorRef, control, ASK_SELF_TIMEOUT).handle((obj, th) -> {
                            return obj instanceof Map ? Source.single((Map) obj) : Source.empty();
                        })).withAttributes(Attributes.inputBuffer(1, 1)).mapMaterializedValue(completionStage -> {
                            return NotUsed.getInstance();
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/ChangeQueueActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CompletionStage;)Lakka/NotUsed;")) {
                    return completionStage -> {
                        return NotUsed.getInstance();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
