package org.eclipse.ditto.services.thingsearch.updater.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.cluster.sharding.ShardRegion;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Objects;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.base.actors.ShutdownBehaviour;
import org.eclipse.ditto.services.models.policies.PolicyReferenceTag;
import org.eclipse.ditto.services.models.policies.PolicyTag;
import org.eclipse.ditto.services.models.streaming.IdentifiableStreamingMessage;
import org.eclipse.ditto.services.models.things.ThingTag;
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.UpdateThing;
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.services.thingsearch.common.config.DittoSearchConfig;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.Metadata;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.streaming.StreamAck;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.signals.events.things.ThingEvent;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/updater/actors/ThingUpdater.class */
final class ThingUpdater extends AbstractActor {
    private final ThingId thingId;
    private final ShutdownBehaviour shutdownBehaviour;
    private final ActorRef changeQueueActor;
    private long thingRevision = -1;

    @Nullable
    private PolicyId policyId = null;
    private long policyRevision = -1;
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

    private ThingUpdater(ActorRef actorRef, ActorRef actorRef2) {
        DittoSearchConfig of = DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
        this.thingId = tryToGetThingId();
        this.shutdownBehaviour = ShutdownBehaviour.fromId(this.thingId, actorRef, getSelf());
        this.changeQueueActor = actorRef2;
        getContext().setReceiveTimeout(of.getUpdaterConfig().getMaxIdleTime());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(ActorRef actorRef, ActorRef actorRef2) {
        return Props.create(ThingUpdater.class, new Object[]{actorRef, actorRef2});
    }

    public AbstractActor.Receive createReceive() {
        return this.shutdownBehaviour.createReceive().match(ThingEvent.class, this::processThingEvent).match(ThingTag.class, this::processThingTag).match(PolicyReferenceTag.class, this::processPolicyReferenceTag).match(UpdateThing.class, this::updateThing).match(UpdateThingResponse.class, this::processUpdateThingResponse).match(ReceiveTimeout.class, this::stopThisActor).matchAny(obj -> {
            this.log.warning("Unknown message in 'eventProcessing' behavior: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void stopThisActor(ReceiveTimeout receiveTimeout) {
        this.log.debug("stopping ThingUpdater <{}> due to <{}>", this.thingId, receiveTimeout);
        getContext().getParent().tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
    }

    private Metadata exportMetadata() {
        return Metadata.of(this.thingId, this.thingRevision, this.policyId, Long.valueOf(this.policyRevision));
    }

    private void enqueueMetadata() {
        enqueueMetadata(exportMetadata());
    }

    private void enqueueMetadata(Metadata metadata) {
        this.changeQueueActor.tell(metadata, getSelf());
    }

    private void processThingTag(ThingTag thingTag) {
        this.log.debug("Received new Thing Tag for thing <{}> with revision <{}>: <{}>.", this.thingId, Long.valueOf(this.thingRevision), thingTag.asIdentifierString());
        if (thingTag.getRevision() > this.thingRevision) {
            this.log.debug("The Thing Tag for the thing <{}> has the revision {} which is greater than the current actor's sequence number <{}>.", this.thingId, Long.valueOf(thingTag.getRevision()), Long.valueOf(this.thingRevision));
            this.thingRevision = thingTag.getRevision();
            enqueueMetadata();
        } else {
            this.log.debug("Dropping <{}> because my thingRevision=<{}>", thingTag, Long.valueOf(this.thingRevision));
        }
        acknowledge(thingTag);
    }

    private void updateThing(UpdateThing updateThing) {
        this.log.withCorrelationId(updateThing).info("Requested to update search index <{}> by <{}>", updateThing, getSender());
        enqueueMetadata();
    }

    private void processUpdateThingResponse(UpdateThingResponse updateThingResponse) {
        if (updateThingResponse.isSuccess()) {
            return;
        }
        Metadata exportMetadata = exportMetadata();
        this.log.warning("Got negative acknowledgement for <{}>; updating to <{}>.", Metadata.fromResponse(updateThingResponse), exportMetadata);
        enqueueMetadata(exportMetadata);
    }

    private void processPolicyReferenceTag(PolicyReferenceTag policyReferenceTag) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Received new Policy-Reference-Tag for thing <{}> with revision <{}>,  policy-id <{}> and policy-revision <{}>: <{}>.", this.thingId, Long.valueOf(this.thingRevision), this.policyId, Long.valueOf(this.policyRevision), new Object[]{policyReferenceTag.asIdentifierString()});
        }
        PolicyTag policyTag = policyReferenceTag.getPolicyTag();
        PolicyId entityId = policyTag.getEntityId();
        if (!Objects.equals(this.policyId, entityId) || this.policyRevision < policyTag.getRevision()) {
            this.policyId = entityId;
            this.policyRevision = policyTag.getRevision();
            enqueueMetadata();
        } else {
            this.log.debug("Dropping <{}> because my policyId=<{}> and policyRevision=<{}>", policyReferenceTag, this.policyId, Long.valueOf(this.policyRevision));
        }
        acknowledge(policyReferenceTag);
    }

    private void processThingEvent(ThingEvent thingEvent) {
        this.log.withCorrelationId(thingEvent);
        this.log.debug("Received new thing event for thing id <{}> with revision <{}>.", this.thingId, Long.valueOf(thingEvent.getRevision()));
        if (thingEvent.getRevision() <= this.thingRevision) {
            this.log.debug("Dropped thing event for thing id <{}> with revision <{}> because it was older than or equal to the current sequence number <{}> of the update actor.", this.thingId, Long.valueOf(thingEvent.getRevision()), Long.valueOf(this.thingRevision));
            return;
        }
        this.log.debug("Applying thing event <{}>.", thingEvent);
        this.thingRevision = thingEvent.getRevision();
        enqueueMetadata();
    }

    private ThingId tryToGetThingId() {
        Charset charset = StandardCharsets.UTF_8;
        try {
            return getThingId(charset);
        } catch (UnsupportedEncodingException e) {
            throw new IllegalStateException(MessageFormat.format("Charset <{0}> is unsupported!", charset.name()), e);
        }
    }

    private ThingId getThingId(Charset charset) throws UnsupportedEncodingException {
        return ThingId.of(URLDecoder.decode(self().path().name(), charset.name()));
    }

    private void acknowledge(IdentifiableStreamingMessage identifiableStreamingMessage) {
        if (getContext().system().deadLetters().equals(getSender())) {
            return;
        }
        getSender().tell(StreamAck.success(identifiableStreamingMessage.asIdentifierString()), getSelf());
    }
}
