package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Scheduler;
import akka.dispatch.DequeBasedMessageQueueSemantics;
import akka.dispatch.RequiresMessageQueue;
import akka.event.DiagnosticLoggingAdapter;
import akka.event.Logging;
import akka.japi.Creator;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskTimeoutException;
import akka.pattern.CircuitBreaker;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import java.io.UnsupportedEncodingException;
import java.lang.invoke.SerializedLambda;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import kamon.Kamon;
import kamon.trace.TraceContext;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.FieldType;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.enforcers.Enforcer;
import org.eclipse.ditto.model.enforcers.PolicyEnforcers;
import org.eclipse.ditto.model.policies.Policy;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.services.models.policies.PolicyReferenceTag;
import org.eclipse.ditto.services.models.policies.commands.sudo.SudoRetrievePolicy;
import org.eclipse.ditto.services.models.policies.commands.sudo.SudoRetrievePolicyResponse;
import org.eclipse.ditto.services.models.streaming.IdentifiableStreamingMessage;
import org.eclipse.ditto.services.models.things.ThingTag;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThing;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThingResponse;
import org.eclipse.ditto.services.thingsearch.persistence.write.EventToPersistenceStrategyFactory;
import org.eclipse.ditto.services.thingsearch.persistence.write.ThingMetadata;
import org.eclipse.ditto.services.thingsearch.persistence.write.ThingsSearchUpdaterPersistence;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.streaming.StreamAck;
import org.eclipse.ditto.signals.base.ShardedMessageEnvelope;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.policies.PolicyErrorResponse;
import org.eclipse.ditto.signals.commands.policies.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.signals.events.policies.PolicyEvent;
import org.eclipse.ditto.signals.events.things.PolicyIdCreated;
import org.eclipse.ditto.signals.events.things.PolicyIdModified;
import org.eclipse.ditto.signals.events.things.ThingCreated;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import org.eclipse.ditto.signals.events.things.ThingModified;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.FiniteDuration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingUpdater.class */
public final class ThingUpdater extends AbstractActorWithDiscardOldStash implements RequiresMessageQueue<DequeBasedMessageQueueSemantics> {
    private static final long UNKNOWN_REVISION = 0;
    static final Duration DEFAULT_THINGS_TIMEOUT = Duration.of(20, ChronoUnit.SECONDS);
    static final int UNLIMITED_MAX_BULK_SIZE = Integer.MAX_VALUE;
    private static final int MAX_SYNC_ATTEMPTS = 3;
    private static final String NO_SYNC_SESSION_ID = "<no-sync-session-id>";
    private static final String TRACE_THING_MODIFIED = "thing.modified";
    private static final String TRACE_THING_BULK_UPDATE = "thing.bulkUpdate";
    private static final String COUNT_THING_BULK_UPDATE = "thing.bulkUpdate.count";
    private static final String TRACE_THING_DELETE = "thing.delete";
    private static final String TRACE_POLICY_UPDATE = "policy.update";
    private final DiagnosticLoggingAdapter log;
    private final int maxBulkSize;
    private final String thingId;
    private final FiniteDuration thingsTimeout;
    private final ActorRef thingsShardRegion;
    private final ActorRef policiesShardRegion;
    private final Duration activityCheckInterval;
    private final ThingsSearchUpdaterPersistence searchUpdaterPersistence;
    private final CircuitBreaker circuitBreaker;
    private final Materializer materializer;
    private final AbstractActor.Receive eventProcessingBehavior;
    private final AbstractActor.Receive awaitSyncResultBehavior;
    private boolean transactionActive;
    private int syncAttempts;
    private Cancellable activityChecker;
    private List<ThingEvent> gatheredEvents;
    private long sequenceNumber;
    private JsonSchemaVersion schemaVersion;
    private String policyId;
    private long policyRevision;
    private Enforcer policyEnforcer;
    private SyncMetadata activeSyncMetadata;
    private String syncSessionId;
    private Cancellable syncTimeout;

    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingUpdater$ActorInitializationComplete.class */
    private static class ActorInitializationComplete {
        static final ActorInitializationComplete INSTANCE = new ActorInitializationComplete();

        private ActorInitializationComplete() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingUpdater$SyncFailure.class */
    public static final class SyncFailure {
        static final SyncFailure INSTANCE = new SyncFailure();

        private SyncFailure() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingUpdater$SyncMetadata.class */
    public static final class SyncMetadata {
        private final ActorRef ackRecipient;
        private final String thingIdentifier;

        private SyncMetadata(ActorRef actorRef, IdentifiableStreamingMessage identifiableStreamingMessage) {
            this.ackRecipient = actorRef;
            this.thingIdentifier = identifiableStreamingMessage.asIdentifierString();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ActorRef getAckRecipient() {
            return this.ackRecipient;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StreamAck getSuccess() {
            return StreamAck.success(this.thingIdentifier);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StreamAck getFailure() {
            return StreamAck.failure(this.thingIdentifier);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingUpdater$SyncSuccess.class */
    public static final class SyncSuccess {
        static final SyncSuccess INSTANCE = new SyncSuccess();

        private SyncSuccess() {
        }
    }

    private ThingUpdater(Duration duration, ActorRef actorRef, ActorRef actorRef2, ThingsSearchUpdaterPersistence thingsSearchUpdaterPersistence, CircuitBreaker circuitBreaker, Duration duration2, int i) {
        this.log = Logging.apply(this);
        this.eventProcessingBehavior = createEventProcessingBehavior();
        this.awaitSyncResultBehavior = createAwaitSyncResultBehavior();
        this.sequenceNumber = -1L;
        this.policyRevision = -1L;
        this.activeSyncMetadata = null;
        this.syncSessionId = NO_SYNC_SESSION_ID;
        this.syncTimeout = null;
        this.maxBulkSize = i;
        this.thingsTimeout = scala.concurrent.duration.Duration.create(duration.toNanos(), TimeUnit.NANOSECONDS);
        this.thingsShardRegion = actorRef;
        this.policiesShardRegion = actorRef2;
        this.activityCheckInterval = duration2;
        this.searchUpdaterPersistence = thingsSearchUpdaterPersistence;
        this.circuitBreaker = circuitBreaker;
        this.gatheredEvents = new ArrayList();
        this.thingId = tryToGetThingId(StandardCharsets.UTF_8);
        this.materializer = ActorMaterializer.create(getContext());
        this.transactionActive = false;
        this.syncAttempts = 0;
        scheduleCheckForThingActivity();
        ((CompletionStage) thingsSearchUpdaterPersistence.getThingMetadata(this.thingId).runWith(Sink.last(), this.materializer)).whenComplete((thingMetadata, th) -> {
            if (th == null) {
                getSelf().tell(thingMetadata, (ActorRef) null);
                return;
            }
            if (th instanceof NoSuchElementException) {
                this.log.debug("Thing was not yet present in search index: {}", this.thingId);
            } else {
                this.log.error(th, "Unexpected exception when retrieving latest revision from search index");
            }
            getSelf().tell(ActorInitializationComplete.INSTANCE, (ActorRef) null);
        });
    }

    private String tryToGetThingId(Charset charset) {
        try {
            return getThingId(charset);
        } catch (UnsupportedEncodingException e) {
            throw new IllegalStateException(MessageFormat.format("Charset <{0}> is unsupported!", charset.name()), e);
        }
    }

    private String getThingId(Charset charset) throws UnsupportedEncodingException {
        return URLDecoder.decode(self().path().name(), charset.name());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final ThingsSearchUpdaterPersistence thingsSearchUpdaterPersistence, final CircuitBreaker circuitBreaker, final ActorRef actorRef, final ActorRef actorRef2, final Duration duration, final Duration duration2, final int i) {
        return Props.create(ThingUpdater.class, new Creator<ThingUpdater>() { // from class: org.eclipse.ditto.services.thingsearch.updater.actors.ThingUpdater.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public ThingUpdater m2create() {
                return new ThingUpdater(duration2, actorRef, actorRef2, thingsSearchUpdaterPersistence, circuitBreaker, duration, i);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ThingMetadata.class, thingMetadata -> {
            this.sequenceNumber = thingMetadata.getThingRevision();
            this.policyId = thingMetadata.getPolicyId();
            this.policyRevision = thingMetadata.getPolicyRevision();
            becomeEventProcessing();
        }).match(ActorInitializationComplete.class, actorInitializationComplete -> {
            becomeEventProcessing();
        }).match(CheckForActivity.class, this::checkActivity).matchAny(obj -> {
            stashWithErrorsIgnored();
        }).build();
    }

    private void scheduleCheckForThingActivity() {
        this.log.debug("Scheduling for activity check in <{}> seconds.", Long.valueOf(this.activityCheckInterval.getSeconds()));
        this.activityChecker = context().system().scheduler().scheduleOnce(scala.concurrent.duration.Duration.create(this.activityCheckInterval.getSeconds(), TimeUnit.SECONDS), self(), new CheckForActivity(this.sequenceNumber), context().dispatcher(), (ActorRef) null);
    }

    private void checkActivity(CheckForActivity checkForActivity) {
        if (checkForActivity.getSequenceNr() != this.sequenceNumber) {
            scheduleCheckForThingActivity();
        } else {
            this.log.debug("Thing <{}> was not updated in a while. Shutting Actor down ...", this.thingId);
            stopThisActor();
        }
    }

    private void stopThisActor() {
        getSelf().tell(PoisonPill.getInstance(), ActorRef.noSender());
    }

    @Override // org.eclipse.ditto.services.thingsearch.updater.actors.AbstractActorWithDiscardOldStash
    public void postStop() throws Exception {
        cancelActivityCheck();
        cancelSyncTimeoutAndResetSessionId();
        super.postStop();
    }

    private void cancelActivityCheck() {
        if (this.activityChecker != null) {
            this.activityChecker.cancel();
            this.activityChecker = null;
        }
    }

    private void cancelSyncTimeoutAndResetSessionId() {
        if (this.syncTimeout != null) {
            this.syncTimeout.cancel();
            this.syncTimeout = null;
        }
        this.syncSessionId = NO_SYNC_SESSION_ID;
    }

    private void becomeEventProcessing() {
        this.log.debug("Becoming 'eventProcessing' ...");
        cancelSyncTimeoutAndResetSessionId();
        getContext().become(this.eventProcessingBehavior);
        unstashAll();
    }

    private AbstractActor.Receive createEventProcessingBehavior() {
        return ReceiveBuilder.create().match(ThingEvent.class, this::processThingEvent).match(PolicyEvent.class, this::processPolicyEvent).match(ThingTag.class, this::processThingTag).match(PolicyReferenceTag.class, this::processPolicyReferenceTag).match(CheckForActivity.class, this::checkActivity).match(PersistenceWriteResult.class, this::handlePersistenceUpdateResult).matchAny(obj -> {
            this.log.warning("Unknown message in 'eventProcessing' behavior: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void processPolicyEvent(PolicyEvent<?> policyEvent) {
        LogUtil.enhanceLogWithCorrelationId(this.log, policyEvent);
        if (!Objects.equals(this.policyId, policyEvent.getPolicyId()) || policyEvent.getRevision() <= this.policyRevision) {
            return;
        }
        this.log.debug("Processing relevant PolicyChange with revision <{}>", Long.valueOf(policyEvent.getRevision()));
        triggerSynchronization();
    }

    private void processThingTag(ThingTag thingTag) {
        LogUtil.enhanceLogWithCorrelationId(this.log, "things-tags-sync-" + thingTag.asIdentifierString());
        this.log.debug("Received new Thing Tag for thing <{}> with revision <{}>: <{}>.", this.thingId, Long.valueOf(this.sequenceNumber), thingTag.asIdentifierString());
        this.activeSyncMetadata = new SyncMetadata(getSender(), thingTag);
        if (thingTag.getRevision() <= this.sequenceNumber) {
            ackSync(true);
        } else {
            this.log.info("The Thing Tag for the thing <{}> has the revision {} which is greater than the current actor's sequence number <{}>.", this.thingId, Long.valueOf(thingTag.getRevision()), Long.valueOf(this.sequenceNumber));
            triggerSynchronization();
        }
    }

    private void processPolicyReferenceTag(PolicyReferenceTag policyReferenceTag) {
        boolean z;
        LogUtil.enhanceLogWithCorrelationId(this.log, "policies-tags-sync-" + policyReferenceTag.asIdentifierString());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Received new Policy-Reference-Tag for thing <{}> with revision <{}>,  policy-id <{}> and policy-revision <{}>: <{}>.", new Object[]{this.thingId, Long.valueOf(this.sequenceNumber), this.policyId, Long.valueOf(this.policyRevision), policyReferenceTag.asIdentifierString()});
        }
        this.activeSyncMetadata = new SyncMetadata(getSender(), policyReferenceTag);
        if (this.policyId == null) {
            this.log.debug("Currently no policy-id is available for the thing <{}>.", this.thingId);
            z = true;
        } else if (!policyReferenceTag.getPolicyTag().getId().equals(this.policyId)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Policy-Reference-Tag has different policy-id than the current policy-id <{}> for the thing <{}>: <{}>.", this.policyId, this.thingId, policyReferenceTag.asIdentifierString());
            }
            z = false;
        } else if (policyReferenceTag.getPolicyTag().getRevision() > this.policyRevision) {
            this.log.info("The Policy-Reference-Tag has a revision which is greater than the current policy-revision <{}> for thing <{}>: <{}>.", Long.valueOf(this.policyRevision), this.thingId, policyReferenceTag.asIdentifierString());
            z = true;
        } else {
            if (this.log.isDebugEnabled()) {
                this.log.debug("The Policy-Reference-Tag has a revision which is less than or equal to the current policy-revision <{}> for thing <{}>: <{}>.", Long.valueOf(this.policyRevision), this.thingId, policyReferenceTag.asIdentifierString());
            }
            z = false;
        }
        if (z) {
            triggerSynchronization();
        } else {
            ackSync(true);
        }
    }

    private void processThingEvent(ThingEvent thingEvent) {
        LogUtil.enhanceLogWithCorrelationId(this.log, thingEvent);
        this.log.debug("Received new thing event for thing id <{}> with revision <{}>.", this.thingId, Long.valueOf(thingEvent.getRevision()));
        if (thingEvent.getRevision() <= this.sequenceNumber) {
            this.log.debug("Dropped thing event for thing id <{}> with revision <{}> because it was older than or equal to the current sequence number <{}> of the update actor.", this.thingId, Long.valueOf(thingEvent.getRevision()), Long.valueOf(this.sequenceNumber));
            return;
        }
        if (shortcutTakenForThingEvent(thingEvent)) {
            this.log.debug("Shortcut taken for thing event <{}>.", thingEvent);
            return;
        }
        if (thingEvent.getRevision() > this.sequenceNumber + 1) {
            this.log.debug("Triggering synchronization for thing <{}> because the received revision <{}> is higher than the expected sequence number <{}>.", this.thingId, Long.valueOf(thingEvent.getRevision()), Long.valueOf(this.sequenceNumber + 1));
            triggerSynchronization();
            return;
        }
        if (needToReloadPolicy(thingEvent)) {
            triggerSynchronization();
            return;
        }
        try {
            if (!EventToPersistenceStrategyFactory.isTypeAllowed(thingEvent.getType())) {
                throw new IllegalStateException(MessageFormat.format("Not processing Thing Event since its Type <{0}> is not allowed", thingEvent.getType()));
            }
            ThingEvent dittoHeaders = thingEvent.setDittoHeaders(thingEvent.getDittoHeaders().toBuilder().schemaVersion(schemaVersionToCheck()).build());
            if (this.transactionActive) {
                this.log.debug("The update actor for thing <{}> is currently busy. The current event will be processed after the actor is.", this.thingId);
                addEventToGatheredEvents(dittoHeaders);
                this.log.debug("Currently gathered <{}> events to be processed after actor has finished being busy", Integer.valueOf(this.gatheredEvents.size()));
            } else {
                persistThingEvents(Collections.singletonList(dittoHeaders), thingEvent.getRevision());
            }
            this.sequenceNumber = thingEvent.getRevision();
        } catch (RuntimeException e) {
            this.log.error(e, "Failed to process event <{}>. Triggering sync.", thingEvent);
            triggerSynchronization();
        }
    }

    private void addEventToGatheredEvents(ThingEvent thingEvent) {
        this.gatheredEvents.add(thingEvent);
    }

    private boolean needToReloadPolicy(ThingEvent thingEvent) {
        return (thingEvent instanceof PolicyIdCreated) || (thingEvent instanceof PolicyIdModified) || (schemaVersionHasPolicy(thingEvent.getImplementedSchemaVersion()) && Objects.isNull(this.policyEnforcer));
    }

    private boolean shortcutTakenForThingEvent(ThingEvent thingEvent) {
        if (thingEvent instanceof ThingCreated) {
            ThingCreated thingCreated = (ThingCreated) thingEvent;
            this.log.debug("Got ThingCreated: {}", thingCreated);
            updateThingSearchIndex(thingCreated.getThing().toBuilder().setRevision(thingCreated.getRevision()).build());
            return true;
        }
        if (!(thingEvent instanceof ThingModified)) {
            return false;
        }
        ThingModified thingModified = (ThingModified) thingEvent;
        this.log.debug("Got ThingModified: {}", thingModified);
        updateThingSearchIndex(thingModified.getThing().toBuilder().setRevision(thingModified.getRevision()).build());
        return true;
    }

    private JsonSchemaVersion schemaVersionToCheck() {
        return (JsonSchemaVersion) Optional.ofNullable(this.schemaVersion).orElse(this.policyId == null ? JsonSchemaVersion.V_1 : JsonSchemaVersion.LATEST);
    }

    private void persistThingEvents(List<ThingEvent> list) {
        persistThingEvents(list, this.sequenceNumber);
    }

    private void persistThingEvents(List<ThingEvent> list, long j) {
        this.log.debug("Executing bulk write operation with <{}> updates.", Integer.valueOf(list.size()));
        if (list.isEmpty()) {
            return;
        }
        this.transactionActive = true;
        Kamon.metrics().histogram(COUNT_THING_BULK_UPDATE).record(list.size());
        TraceContext newContext = Kamon.tracer().newContext(TRACE_THING_BULK_UPDATE);
        this.circuitBreaker.callWithCircuitBreakerCS(() -> {
            return ((CompletionStage) this.searchUpdaterPersistence.executeCombinedWrites(this.thingId, list, this.policyEnforcer, j).via(finishTrace(newContext)).runWith(Sink.last(), this.materializer)).whenComplete(this::processWriteResult);
        }).exceptionally(th -> {
            this.log.error(th, "There occurred an error while processing a write operation within the circuit breaker for thing <{}>.", this.thingId);
            processWriteResult(false, th);
            return null;
        });
    }

    private void processWriteResult(Boolean bool, Throwable th) {
        getSelf().tell(new PersistenceWriteResult(isTrue(bool), th), (ActorRef) null);
    }

    private void handlePersistenceUpdateResult(PersistenceWriteResult persistenceWriteResult) {
        this.transactionActive = false;
        if (persistenceWriteResult.getError() != null) {
            this.log.error(persistenceWriteResult.getError(), "The MongoDB operation for thing <{}> failed with an error!", this.thingId);
            triggerSynchronization();
            return;
        }
        if (!persistenceWriteResult.isSuccess()) {
            this.log.warning("The update operation for thing <{}> failed due to an unexpected sequence number!", this.thingId);
            triggerSynchronization();
            return;
        }
        if (this.gatheredEvents.isEmpty()) {
            return;
        }
        this.log.debug("<{}> gathered events will now be persisted.", Integer.valueOf(this.gatheredEvents.size()));
        List<ThingEvent> list = this.gatheredEvents;
        resetGatheredEvents();
        int size = list.size();
        if (size <= this.maxBulkSize) {
            persistThingEvents(list);
        } else {
            this.log.info("Triggering synchronization because <{}> events were accumulated since last bulk update, which exceeded the limit of <{}> events per bulk update.", Integer.valueOf(size), Integer.valueOf(this.maxBulkSize));
            triggerSynchronization();
        }
    }

    private void triggerSynchronization() {
        beginNewSyncSession();
        this.policyEnforcer = null;
        this.syncAttempts++;
        this.transactionActive = false;
        resetGatheredEvents();
        if (this.syncAttempts <= MAX_SYNC_ATTEMPTS) {
            this.log.debug("Synchronization of thing <{}> is now triggered (attempt={}).", this.thingId, Integer.valueOf(this.syncAttempts));
            syncThing();
        } else {
            this.log.error("Synchronization failed after <{}> attempts.", Integer.valueOf(this.syncAttempts - 1));
            ackSync(false);
            stopThisActor();
        }
    }

    private void syncThing() {
        this.log.debug("Trying to synchronize thing <{}>.", this.thingId);
        retrieveThing();
        becomeAwaitingSyncThingResponse();
    }

    private void becomeAwaitingSyncThingResponse() {
        this.log.debug("Becoming 'awaitingSyncThingResponse' for thing <{}> ...", this.thingId);
        getContext().become(createAwaitSyncThingBehavior(this.syncSessionId));
    }

    private AbstractActor.Receive createAwaitSyncThingBehavior(String str) {
        this.log.debug("Becoming 'awaitSyncThingBehavior' for thing <{}> ...", this.thingId);
        return ReceiveBuilder.create().match(AskTimeoutException.class, handleSyncTimeout(str, "Timeout after SudoRetrieveThing")).match(SudoRetrieveThingResponse.class, this::handleSyncThingResponse).match(ThingErrorResponse.class, (v1) -> {
            handleErrorResponse(v1);
        }).match(DittoRuntimeException.class, this::handleException).match(CheckForActivity.class, this::checkActivity).matchAny(obj -> {
            stashWithErrorsIgnored();
        }).build();
    }

    private FI.UnitApply<AskTimeoutException> handleSyncTimeout(String str, String str2) {
        return askTimeoutException -> {
            String message = askTimeoutException.getMessage();
            if (!(!Objects.equals(NO_SYNC_SESSION_ID, str)) || !Objects.equals(message, str)) {
                this.log.warning("Ignoring AskTimeoutException from session <{}>. Current session is <{}>.", message, str);
            } else {
                this.log.error(str2);
                triggerSynchronization();
            }
        };
    }

    private void syncPolicy(Thing thing) {
        this.log.debug("Trying to synchronize policy <{}>.", this.policyId);
        if (this.policyId == null) {
            syncThing();
        } else {
            retrievePolicy(this.policyId);
            becomeAwaitingSyncPolicyResponse(thing);
        }
    }

    private void becomeAwaitingSyncPolicyResponse(Thing thing) {
        this.log.debug("Becoming 'awaitingSyncPolicyResponse' for policy <{}> ...", this.policyId);
        getContext().become(createAwaitSyncPolicyBehavior(this.syncSessionId, thing));
    }

    private AbstractActor.Receive createAwaitSyncPolicyBehavior(String str, Thing thing) {
        this.log.debug("Becoming 'awaitSyncPolicyBehavior' for thing <{}> ...", this.thingId);
        return ReceiveBuilder.create().match(AskTimeoutException.class, handleSyncTimeout(str, "Timeout after SudoRetrievePolicy")).match(SudoRetrievePolicyResponse.class, sudoRetrievePolicyResponse -> {
            handleSyncPolicyResponse(thing, sudoRetrievePolicyResponse);
        }).match(PolicyErrorResponse.class, (v1) -> {
            handleErrorResponse(v1);
        }).match(DittoRuntimeException.class, this::handleException).match(CheckForActivity.class, this::checkActivity).matchAny(obj -> {
            stashWithErrorsIgnored();
        }).build();
    }

    private void becomeSyncResultAwaiting() {
        this.log.debug("Becoming 'syncResultAwaiting' ...");
        getContext().become(this.awaitSyncResultBehavior);
    }

    private AbstractActor.Receive createAwaitSyncResultBehavior() {
        return ReceiveBuilder.create().match(SyncSuccess.class, syncSuccess -> {
            this.syncAttempts = 0;
            ackSync(true);
            becomeEventProcessing();
        }).match(SyncFailure.class, syncFailure -> {
            triggerSynchronization();
        }).match(CheckForActivity.class, this::checkActivity).matchAny(obj -> {
            stashWithErrorsIgnored();
        }).build();
    }

    private void ackSync(boolean z) {
        if (this.activeSyncMetadata == null) {
            this.log.debug("Cannot ack sync, cause no recipient is available.");
            return;
        }
        ActorRef ackRecipient = this.activeSyncMetadata.getAckRecipient();
        if (Objects.equals(ackRecipient, getContext().getSystem().deadLetters())) {
            this.log.debug("Cannot ack sync, cause recipient is deadletters.");
        } else {
            StreamAck success = z ? this.activeSyncMetadata.getSuccess() : this.activeSyncMetadata.getFailure();
            this.log.debug("Acking sync with result <{}> to <{}>", success, ackRecipient);
            ackRecipient.tell(success, getSelf());
        }
        this.activeSyncMetadata = null;
    }

    private void handleSyncThingResponse(SudoRetrieveThingResponse sudoRetrieveThingResponse) {
        this.log.debug("Retrieved thing response='{}' for thing ID='{}' (attempt={}).", sudoRetrieveThingResponse, this.thingId, Integer.valueOf(this.syncAttempts));
        updateThingSearchIndex(sudoRetrieveThingResponse.getThing());
    }

    private void updateThingSearchIndex(Thing thing) {
        this.sequenceNumber = ((Long) thing.getRevision().map((v0) -> {
            return v0.toLong();
        }).orElse(Long.valueOf(UNKNOWN_REVISION))).longValue();
        this.schemaVersion = thing.getImplementedSchemaVersion();
        String str = (String) thing.getPolicyId().orElse(null);
        if (str != null) {
            if (!str.equals(this.policyId)) {
                this.policyRevision = -1L;
                this.policyEnforcer = null;
                this.policyId = str;
            }
        } else if (!hasNonEmptyAcl(thing)) {
            this.log.error("Thing to update in search index had neither a policyId nor an ACL: {}", thing);
            triggerSynchronization();
            return;
        } else {
            this.policyRevision = -1L;
            this.policyEnforcer = null;
            this.policyId = null;
        }
        if (str == null) {
            this.schemaVersion = JsonSchemaVersion.V_1;
        }
        if (!schemaVersionHasPolicy(this.schemaVersion)) {
            updateSearchIndexWithoutPolicy(thing);
        } else if (this.policyEnforcer != null) {
            updateSearchIndexWithPolicy(thing, null);
        } else {
            syncPolicy(thing);
        }
    }

    private static boolean hasNonEmptyAcl(Thing thing) {
        return thing.getAccessControlList().filter(accessControlList -> {
            return !accessControlList.isEmpty();
        }).isPresent();
    }

    private void handleSyncPolicyResponse(Thing thing, SudoRetrievePolicyResponse sudoRetrievePolicyResponse) {
        this.log.debug("Retrieved policy response='{}' for thing ID='{}' and policyId='{}' (attempt={}).", sudoRetrievePolicyResponse, this.thingId, this.policyId, Integer.valueOf(this.syncAttempts));
        this.log.debug("Policy from retrieved policy response is: {}", sudoRetrievePolicyResponse.getPolicy());
        Policy policy = sudoRetrievePolicyResponse.getPolicy();
        Optional id = policy.getId();
        String str = this.policyId;
        str.getClass();
        if (!id.filter((v1) -> {
            return r1.equals(v1);
        }).isPresent()) {
            this.log.warning("Received policy ID <{0}> is not expected ID <{1}>!", policy.getId(), this.policyId);
            return;
        }
        this.policyRevision = ((Long) policy.getRevision().map((v0) -> {
            return v0.toLong();
        }).orElse(Long.valueOf(UNKNOWN_REVISION))).longValue();
        Enforcer defaultEvaluator = PolicyEnforcers.defaultEvaluator(policy);
        this.policyEnforcer = defaultEvaluator;
        updateSearchIndexWithPolicy(thing, defaultEvaluator);
    }

    private void handleException(DittoRuntimeException dittoRuntimeException) {
        if (dittoRuntimeException instanceof ThingNotAccessibleException) {
            this.log.info("Thing no longer accessible - deleting the Thing from search index: {}", dittoRuntimeException.getMessage());
            deleteThingFromSearchIndex();
        } else if (dittoRuntimeException instanceof PolicyNotAccessibleException) {
            this.log.info("Policy no longer accessible - deleting the Thing from search index: {}", dittoRuntimeException.getMessage());
            deleteThingFromSearchIndex();
        } else {
            this.log.error("Received exception while trying to sync thing <{}>: {} {}", this.thingId, dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage());
            triggerSynchronization();
        }
    }

    private void deleteThingFromSearchIndex() {
        TraceContext newContext = Kamon.tracer().newContext(TRACE_THING_DELETE);
        this.circuitBreaker.callWithCircuitBreakerCS(() -> {
            return ((CompletionStage) this.searchUpdaterPersistence.delete(this.thingId).via(finishTrace(newContext)).runWith(Sink.last(), this.materializer)).whenComplete(this::handleDeletion);
        });
    }

    private void handleDeletion(Boolean bool, Throwable th) {
        if (isTrue(bool)) {
            this.log.info("Thing <{}> was deleted from search index due to synchronization. The actor will be stopped now.", this.thingId);
            ackSync(true);
            stopThisActor();
        } else if (th != null) {
            this.log.error(th, "Deletion due to synchronization of thing <{}> was not successful as the update operation failed!", this.thingId);
            triggerSynchronization();
        } else {
            ackSync(true);
            stopThisActor();
        }
    }

    private void handleErrorResponse(ErrorResponse<?> errorResponse) {
        handleException(errorResponse.getDittoRuntimeException());
    }

    private void updateSearchIndexWithoutPolicy(Thing thing) {
        becomeSyncResultAwaiting();
        updateThing(thing).whenComplete((bool, th) -> {
            handleInsertOrUpdateResult(bool, thing, th);
        });
    }

    private void updateSearchIndexWithPolicy(Thing thing, Enforcer enforcer) {
        becomeSyncResultAwaiting();
        updateThing(thing).whenComplete((bool, th) -> {
            updatePolicy(thing, enforcer != null ? enforcer : this.policyEnforcer).whenComplete((bool, th) -> {
                handleInsertOrUpdateResult(bool, thing, th == null ? th : th);
            });
        });
    }

    private CompletionStage<Boolean> updateThing(Thing thing) {
        long longValue = ((Long) thing.getRevision().map((v0) -> {
            return v0.toLong();
        }).orElseThrow(() -> {
            return new IllegalArgumentException(MessageFormat.format("The Thing <{0}> has no revision!", this.thingId));
        })).longValue();
        TraceContext newContext = Kamon.tracer().newContext(TRACE_THING_MODIFIED);
        return this.circuitBreaker.callWithCircuitBreakerCS(() -> {
            return (CompletionStage) this.searchUpdaterPersistence.insertOrUpdate(thing, longValue, this.policyRevision).via(finishTrace(newContext)).runWith(Sink.last(), this.materializer);
        });
    }

    private CompletionStage<Boolean> updatePolicy(Thing thing, Enforcer enforcer) {
        if (enforcer != null) {
            TraceContext newContext = Kamon.tracer().newContext(TRACE_POLICY_UPDATE);
            return this.circuitBreaker.callWithCircuitBreakerCS(() -> {
                return ((CompletionStage) this.searchUpdaterPersistence.updatePolicy(thing, enforcer).via(finishTrace(newContext)).runWith(Sink.last(), this.materializer)).whenComplete((bool, th) -> {
                    if (null != th) {
                        this.log.error(th, "Failed to update policy because of an exception!");
                    } else if (bool.booleanValue()) {
                        this.log.debug("Successfully updated policy.");
                    } else {
                        this.log.debug("The update operation for the policy of Thing <{}> did not have an effect, probably because it does not contain fine-grained policies!", this.thingId);
                    }
                });
            });
        }
        this.log.warning("Enforcer was null when trying to update Policy search index - resyncing Policy!");
        syncPolicy(thing);
        return CompletableFuture.completedFuture(Boolean.FALSE);
    }

    private void handleInsertOrUpdateResult(Boolean bool, Thing thing, Throwable th) {
        if (th != null) {
            this.log.error(th, "The thing <{}> was not successfully updated in the search index as the update operation failed: {}", this.thingId, th.getMessage());
            getSelf().tell(SyncFailure.INSTANCE, (ActorRef) null);
        } else if (isTrue(bool)) {
            this.log.debug("The thing <{}> was successfully updated in search index", this.thingId);
            getSelf().tell(SyncSuccess.INSTANCE, (ActorRef) null);
        } else {
            this.log.warning("The thing <{}> was not updated as the index was not changed by the upsert: {}", this.thingId, thing);
            getSelf().tell(SyncFailure.INSTANCE, (ActorRef) null);
        }
    }

    private static boolean schemaVersionHasPolicy(JsonSchemaVersion jsonSchemaVersion) {
        return jsonSchemaVersion.toInt() > JsonSchemaVersion.V_1.toInt();
    }

    private void beginNewSyncSession() {
        cancelSyncTimeoutAndResetSessionId();
        String uuid = UUID.randomUUID().toString();
        LogUtil.enhanceLogWithCorrelationId(this.log, uuid);
        AbstractActor.ActorContext context = getContext();
        Scheduler scheduler = context.system().scheduler();
        ActorRef self = getSelf();
        AskTimeoutException askTimeoutException = new AskTimeoutException(uuid);
        ExecutionContextExecutor dispatcher = context.dispatcher();
        this.syncSessionId = uuid;
        this.syncTimeout = scheduler.scheduleOnce(this.thingsTimeout, self, askTimeoutException, dispatcher, (ActorRef) null);
    }

    private void retrievePolicy(String str) {
        this.policiesShardRegion.tell(SudoRetrievePolicy.of(str, DittoHeaders.newBuilder().correlationId("thingUpdater-sudoRetrievePolicy-" + UUID.randomUUID()).build()), getSelf());
    }

    private void retrieveThing() {
        DittoHeaders build = DittoHeaders.newBuilder().correlationId("thingUpdater-sudoRetrieveThing-" + UUID.randomUUID()).build();
        SudoRetrieveThing withOriginalSchemaVersion = SudoRetrieveThing.withOriginalSchemaVersion(this.thingId, build);
        this.thingsShardRegion.tell(ShardedMessageEnvelope.of(this.thingId, withOriginalSchemaVersion.getType(), withOriginalSchemaVersion.toJson(withOriginalSchemaVersion.getImplementedSchemaVersion(), FieldType.regularOrSpecial()), build), getSelf());
    }

    private void stashWithErrorsIgnored() {
        try {
            stash();
        } catch (IllegalStateException e) {
        }
    }

    private static Flow<Boolean, Boolean, NotUsed> finishTrace(TraceContext traceContext) {
        return Flow.fromFunction(bool -> {
            traceContext.finish();
            return bool;
        });
    }

    private static boolean isTrue(Boolean bool) {
        return bool != null && bool.booleanValue();
    }

    private void resetGatheredEvents() {
        this.gatheredEvents = new ArrayList();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1073243001:
                if (implMethodName.equals("lambda$finishTrace$ddc6df08$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/updater/actors/ThingUpdater") && serializedLambda.getImplMethodSignature().equals("(Lkamon/trace/TraceContext;Ljava/lang/Boolean;)Ljava/lang/Boolean;")) {
                    TraceContext traceContext = (TraceContext) serializedLambda.getCapturedArg(0);
                    return bool -> {
                        traceContext.finish();
                        return bool;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
