package org.eclipse.ditto.services.concierge.actors.cleanup;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.FanInShape2;
import akka.stream.Graph;
import akka.stream.SourceShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import java.lang.invoke.SerializedLambda;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonFieldMarker;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.concierge.actors.ShardRegions;
import org.eclipse.ditto.services.concierge.actors.cleanup.credits.CreditDecisionSource;
import org.eclipse.ditto.services.concierge.actors.cleanup.messages.CreditDecision;
import org.eclipse.ditto.services.concierge.actors.cleanup.persistenceids.PersistenceIdSource;
import org.eclipse.ditto.services.concierge.common.PersistenceCleanupConfig;
import org.eclipse.ditto.services.models.connectivity.ConnectionTag;
import org.eclipse.ditto.services.models.policies.PolicyTag;
import org.eclipse.ditto.services.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.services.models.things.ThingSnapshotTaken;
import org.eclipse.ditto.services.models.things.ThingTag;
import org.eclipse.ditto.services.utils.akka.controlflow.Transistor;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.services.utils.health.AbstractBackgroundStreamingActorWithConfigWithStatusReport;
import org.eclipse.ditto.signals.commands.cleanup.CleanupPersistence;
import org.eclipse.ditto.signals.commands.cleanup.CleanupPersistenceResponse;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.policies.PolicyCommand;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import scala.PartialFunction;

/* loaded from: input_file:org/eclipse/ditto/services/concierge/actors/cleanup/EventSnapshotCleanupCoordinator.class */
public final class EventSnapshotCleanupCoordinator extends AbstractBackgroundStreamingActorWithConfigWithStatusReport<PersistenceCleanupConfig> {
    public static final String ACTOR_NAME = "eventSnapshotCleanupCoordinator";
    private static final String ERROR_MESSAGE_HEADER = "error";
    private static final String REQUESTED_MESSAGE_HEADER = "requested";
    private static final JsonFieldDefinition<JsonArray> JSON_CREDIT_DECISIONS = JsonFactory.newJsonArrayFieldDefinition("credit-decisions", new JsonFieldMarker[0]);
    private static final JsonFieldDefinition<JsonArray> JSON_ACTIONS = JsonFactory.newJsonArrayFieldDefinition("actions", new JsonFieldMarker[0]);
    private static final String START = "start";
    private final ActorRef pubSubMediator;
    private final ShardRegions shardRegions;
    private final Deque<Pair<Instant, CreditDecision>> creditDecisions;
    private final Deque<Pair<Instant, CleanupPersistenceResponse>> actions;
    private final LinkedHashSet<ThingId> pendingRequests;
    int creditForRequests;

    private EventSnapshotCleanupCoordinator(PersistenceCleanupConfig persistenceCleanupConfig, ActorRef actorRef, ShardRegions shardRegions) {
        super(persistenceCleanupConfig);
        this.pubSubMediator = actorRef;
        this.shardRegions = shardRegions;
        this.creditDecisions = new ArrayDeque(persistenceCleanupConfig.getKeptCreditDecisions() + 1);
        this.actions = new ArrayDeque(persistenceCleanupConfig.getKeptActions() + 1);
        this.pendingRequests = new LinkedHashSet<>();
        this.creditForRequests = persistenceCleanupConfig.getCreditDecisionConfig().getCreditForRequests();
    }

    public void preStart() throws Exception {
        super.preStart();
        this.pubSubMediator.tell(DistPubSubAccess.subscribeViaGroup("thing:snapshottaken", ACTOR_NAME, getSelf()), getSelf());
    }

    public static Props props(PersistenceCleanupConfig persistenceCleanupConfig, ActorRef actorRef, ShardRegions shardRegions) {
        return Props.create(EventSnapshotCleanupCoordinator.class, new Object[]{persistenceCleanupConfig, actorRef, shardRegions});
    }

    protected void preEnhanceSleepingBehavior(ReceiveBuilder receiveBuilder) {
        receiveBuilder.match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            this.log.info("Got <{}>", subscribeAck);
        }).match(CreditDecision.class, this::onCreditDecision).match(ThingSnapshotTaken.class, this::onThingSnapshotTaken).match(CleanupPersistenceResponse.class, this::onCleanupResponse);
    }

    protected void preEnhanceStreamingBehavior(ReceiveBuilder receiveBuilder) {
        receiveBuilder.match(CreditDecision.class, this::onCreditDecision).match(ThingSnapshotTaken.class, this::onThingSnapshotTaken).match(CleanupPersistenceResponse.class, this::onCleanupResponse);
    }

    private void onCleanupResponse(CleanupPersistenceResponse cleanupPersistenceResponse) {
        enqueue(this.actions, cleanupPersistenceResponse, this.config.getKeptActions());
    }

    private void onCreditDecision(CreditDecision creditDecision) {
        enqueue(this.creditDecisions, creditDecision, this.config.getKeptCreditDecisions());
        if (creditDecision.getCredit() > 0) {
            this.creditForRequests = this.config.getCreditDecisionConfig().getCreditForRequests();
            flushPendingRequests();
        }
    }

    private void flushPendingRequests() {
        Iterator<ThingId> it = this.pendingRequests.iterator();
        while (this.creditForRequests > 0 && it.hasNext()) {
            this.creditForRequests--;
            cleanUpThingByRequest(it.next());
        }
    }

    private void onThingSnapshotTaken(ThingSnapshotTaken thingSnapshotTaken) {
        if (this.creditForRequests > 0) {
            this.creditForRequests--;
            cleanUpThingByRequest(thingSnapshotTaken.getEntityId());
        } else if (this.pendingRequests.size() < this.config.getCreditDecisionConfig().getMaxPendingRequests()) {
            this.pendingRequests.add(thingSnapshotTaken.getEntityId());
        } else {
            this.log.info("Dropping <{}> because cache is full.", thingSnapshotTaken);
        }
    }

    private void cleanUpThingByRequest(ThingId thingId) {
        askShardRegionForCleanup(this.shardRegions.things(), ThingCommand.RESOURCE_TYPE, ThingTag.of(thingId, 0L)).thenAccept(cleanupPersistenceResponse -> {
            getSelf().tell(cleanupPersistenceResponse.setDittoHeaders(cleanupPersistenceResponse.getDittoHeaders().toBuilder().putHeader(REQUESTED_MESSAGE_HEADER, "true").build()), ActorRef.noSender());
        });
    }

    private <T> Flow<T, T, NotUsed> reportToSelf() {
        return Flow.fromFunction(obj -> {
            getSelf().tell(obj, getSelf());
            return obj;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseConfig, reason: merged with bridge method [inline-methods] */
    public PersistenceCleanupConfig m1parseConfig(Config config) {
        return PersistenceCleanupConfig.fromConfig(config);
    }

    private Source<EntityIdWithRevision<?>, NotUsed> getEntityIdWithRevisionSource() {
        return Source.fromGraph(GraphDSL.create(builder -> {
            SourceShape add = builder.add(persistenceIdSource());
            SourceShape add2 = builder.add(creditSource());
            FanInShape2 add3 = builder.add(Transistor.of());
            builder.from(add.out()).toInlet(add3.in0());
            builder.from(add2.out()).toInlet(add3.in1());
            return SourceShape.of(add3.out());
        })).log("pid-source", this.log);
    }

    private Source<Integer, NotUsed> creditSource() {
        return Source.fromGraph(CreditDecisionSource.create(this.config.getCreditDecisionConfig(), getContext(), this.pubSubMediator, this.log)).via(reportToSelf()).map((v0) -> {
            return v0.getCredit();
        });
    }

    private Graph<SourceShape<EntityIdWithRevision<?>>, NotUsed> persistenceIdSource() {
        return PersistenceIdSource.create(this.config.getPersistenceIdsConfig(), this.pubSubMediator);
    }

    protected Source<CleanupPersistenceResponse, NotUsed> getSource() {
        PartialFunction build = new PFBuilder().match(ThingTag.class, thingTag -> {
            return askShardRegionForCleanup(this.shardRegions.things(), ThingCommand.RESOURCE_TYPE, thingTag);
        }).match(PolicyTag.class, policyTag -> {
            return askShardRegionForCleanup(this.shardRegions.policies(), PolicyCommand.RESOURCE_TYPE, policyTag);
        }).match(ConnectionTag.class, connectionTag -> {
            return askShardRegionForCleanup(this.shardRegions.connections(), ConnectivityCommand.RESOURCE_TYPE, connectionTag);
        }).matchAny(entityIdWithRevision -> {
            String str = "Unexpected entity ID type: " + entityIdWithRevision;
            this.log.error(str);
            return CompletableFuture.completedFuture(CleanupPersistenceResponse.failure(entityIdWithRevision.getEntityId(), DittoHeaders.newBuilder().putHeader(ERROR_MESSAGE_HEADER, str).build()));
        }).build();
        Source<EntityIdWithRevision<?>, NotUsed> entityIdWithRevisionSource = getEntityIdWithRevisionSource();
        int parallelism = this.config.getParallelism();
        Objects.requireNonNull(build);
        return entityIdWithRevisionSource.mapAsync(parallelism, (v1) -> {
            return r2.apply(v1);
        }).via(reportToSelf()).log(EventSnapshotCleanupCoordinator.class.getSimpleName(), this.log);
    }

    private CompletionStage<CleanupPersistenceResponse> askShardRegionForCleanup(ActorRef actorRef, String str, EntityIdWithRevision<?> entityIdWithRevision) {
        EntityId entityId = entityIdWithRevision.getEntityId();
        CleanupPersistence cleanupCommand = getCleanupCommand(entityId);
        return Patterns.ask(actorRef, cleanupCommand, this.config.getCleanupTimeout()).handle((obj, th) -> {
            if (!(obj instanceof CleanupPersistenceResponse)) {
                return CleanupPersistenceResponse.failure(entityId, cleanupCommand.getDittoHeaders().toBuilder().putHeader(ERROR_MESSAGE_HEADER, String.format("Unexpected response from shard <%s>: result=<%s> error=<%s>", str, obj, th)).build());
            }
            CleanupPersistenceResponse cleanupPersistenceResponse = (CleanupPersistenceResponse) obj;
            return cleanupPersistenceResponse.setDittoHeaders(cleanupCommand.getDittoHeaders().toBuilder().putHeaders(cleanupPersistenceResponse.getDittoHeaders()).build());
        });
    }

    protected void postEnhanceStatusReport(JsonObjectBuilder jsonObjectBuilder) {
        jsonObjectBuilder.set(JSON_CREDIT_DECISIONS, (JsonArray) this.creditDecisions.stream().map(EventSnapshotCleanupCoordinator::renderCreditDecision).collect(JsonCollectors.valuesToArray())).set(JSON_ACTIONS, (JsonArray) this.actions.stream().map(EventSnapshotCleanupCoordinator::renderAction).collect(JsonCollectors.valuesToArray())).build();
    }

    private static JsonObject renderCreditDecision(Pair<Instant, CreditDecision> pair) {
        return JsonObject.newBuilder().set(((Instant) pair.first()).toString(), ((CreditDecision) pair.second()).toString()).build();
    }

    private static JsonObject renderAction(Pair<Instant, CleanupPersistenceResponse> pair) {
        CleanupPersistenceResponse cleanupPersistenceResponse = (CleanupPersistenceResponse) pair.second();
        DittoHeaders dittoHeaders = cleanupPersistenceResponse.getDittoHeaders();
        return JsonObject.newBuilder().set(((Instant) pair.first()).toString(), String.format("%d start=%s <%s>", Integer.valueOf(cleanupPersistenceResponse.getHttpStatus().getCode()), (String) dittoHeaders.getOrDefault(START, "unknown"), getResponseMessage(cleanupPersistenceResponse))).build();
    }

    private static String getResponseMessage(CleanupPersistenceResponse cleanupPersistenceResponse) {
        StringBuilder sb = new StringBuilder();
        DittoHeaders dittoHeaders = cleanupPersistenceResponse.getDittoHeaders();
        if (dittoHeaders.containsKey(REQUESTED_MESSAGE_HEADER)) {
            sb.append("requested by ");
        }
        if (cleanupPersistenceResponse.getEntityId().isDummy()) {
            sb.append((String) dittoHeaders.get(ERROR_MESSAGE_HEADER));
        } else {
            sb.append(cleanupPersistenceResponse.getEntityId().toString());
            if (dittoHeaders.containsKey(ERROR_MESSAGE_HEADER)) {
                sb.append(": ").append((String) dittoHeaders.get(ERROR_MESSAGE_HEADER));
            }
        }
        return sb.toString();
    }

    private static CleanupPersistence getCleanupCommand(EntityId entityId) {
        return CleanupPersistence.of(entityId, DittoHeaders.newBuilder().putHeader(START, Instant.now().toString()).build());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1955961106:
                if (implMethodName.equals("lambda$reportToSelf$aec1ad22$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1334301638:
                if (implMethodName.equals("lambda$getEntityIdWithRevisionSource$a4d3e0d3$1")) {
                    z = false;
                    break;
                }
                break;
            case 93029230:
                if (implMethodName.equals("apply")) {
                    z = 3;
                    break;
                }
                break;
            case 343723503:
                if (implMethodName.equals("getCredit")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/EventSnapshotCleanupCoordinator") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/GraphDSL$Builder;)Lakka/stream/SourceShape;")) {
                    EventSnapshotCleanupCoordinator eventSnapshotCleanupCoordinator = (EventSnapshotCleanupCoordinator) serializedLambda.getCapturedArg(0);
                    return builder -> {
                        SourceShape add = builder.add(persistenceIdSource());
                        SourceShape add2 = builder.add(creditSource());
                        FanInShape2 add3 = builder.add(Transistor.of());
                        builder.from(add.out()).toInlet(add3.in0());
                        builder.from(add2.out()).toInlet(add3.in1());
                        return SourceShape.of(add3.out());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/messages/CreditDecision") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.getCredit();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/EventSnapshotCleanupCoordinator") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    EventSnapshotCleanupCoordinator eventSnapshotCleanupCoordinator2 = (EventSnapshotCleanupCoordinator) serializedLambda.getCapturedArg(0);
                    return obj -> {
                        getSelf().tell(obj, getSelf());
                        return obj;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("scala/Function1") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    PartialFunction partialFunction = (PartialFunction) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        return r0.apply(v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
