package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.actor.AbstractActorPublisherWithStash;
import akka.stream.actor.ActorPublisherMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/EventAndResponsePublisher.class */
public final class EventAndResponsePublisher extends AbstractActorPublisherWithStash<Jsonifiable.WithPredicate<JsonObject, JsonField>> {
    private static final int MESSAGE_CONSUMPTION_CHECK_SECONDS = 2;
    private final DiagnosticLoggingAdapter logger;
    private final int backpressureBufferSize;
    private final List<Jsonifiable.WithPredicate<JsonObject, JsonField>> buffer;
    private final AtomicBoolean currentlyInMessageConsumedCheck;
    private String connectionCorrelationId;

    private EventAndResponsePublisher(int i) {
        this.logger = LogUtil.obtain(this);
        this.buffer = new ArrayList();
        this.currentlyInMessageConsumedCheck = new AtomicBoolean(false);
        this.backpressureBufferSize = i;
    }

    public static Props props(final int i) {
        return Props.create(EventAndResponsePublisher.class, new Creator<EventAndResponsePublisher>() { // from class: org.eclipse.ditto.services.gateway.streaming.actors.EventAndResponsePublisher.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public EventAndResponsePublisher m4create() {
                return new EventAndResponsePublisher(i);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Connect.class, connect -> {
            String connectionCorrelationId = connect.getConnectionCorrelationId();
            LogUtil.enhanceLogWithCorrelationId(this.logger, connectionCorrelationId);
            this.logger.debug("Established new connection: {}", connectionCorrelationId);
            getContext().become(connected(connectionCorrelationId));
        }).matchAny(obj -> {
            this.logger.info("Got unknown message during init phase '{}' - stashing..", obj);
            stash();
        }).build();
    }

    private AbstractActor.Receive connected(String str) {
        this.connectionCorrelationId = str;
        unstashAll();
        return ReceiveBuilder.create().match(Signal.class, signal -> {
            return this.buffer.size() >= this.backpressureBufferSize;
        }, signal2 -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, signal2);
            handleBackpressureFor(signal2);
        }).match(Signal.class, signal3 -> {
            if (this.buffer.isEmpty() && totalDemand() > 0) {
                onNext(signal3);
            } else {
                this.buffer.add(signal3);
                deliverBuf();
            }
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            return this.buffer.size() >= this.backpressureBufferSize;
        }, dittoRuntimeException2 -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, dittoRuntimeException2.getDittoHeaders().getCorrelationId());
            handleBackpressureFor(dittoRuntimeException2);
        }).match(DittoRuntimeException.class, dittoRuntimeException3 -> {
            if (this.buffer.isEmpty() && totalDemand() > 0) {
                onNext(dittoRuntimeException3);
            } else {
                this.buffer.add(dittoRuntimeException3);
                deliverBuf();
            }
        }).match(Jsonifiable.WithPredicate.class, withPredicate -> {
            return this.buffer.size() >= this.backpressureBufferSize;
        }, this::handleBackpressureFor).match(Jsonifiable.WithPredicate.class, withPredicate2 -> {
            if (this.buffer.isEmpty() && totalDemand() > 0) {
                onNext(withPredicate2);
            } else {
                this.buffer.add(withPredicate2);
                deliverBuf();
            }
        }).match(ActorPublisherMessage.Request.class, request -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, str);
            this.logger.debug("Got new demand: {}", request);
            deliverBuf();
        }).match(ActorPublisherMessage.Cancel.class, cancel -> {
            getContext().stop(getSelf());
        }).matchAny(obj -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, str);
            this.logger.warning("Got unknown message during connected phase: '{}'", obj);
        }).build();
    }

    private void handleBackpressureFor(Jsonifiable.WithPredicate<JsonObject, JsonField> withPredicate) {
        if (this.currentlyInMessageConsumedCheck.compareAndSet(false, true)) {
            this.logger.warning("Backpressure - buffer of '{}' outstanding Events/CommandResponses is full, dropping '{}'", Integer.valueOf(this.backpressureBufferSize), withPredicate);
            long size = this.buffer.size();
            AbstractActor.ActorContext context = getContext();
            context.system().scheduler().scheduleOnce(FiniteDuration.apply(2L, TimeUnit.SECONDS), () -> {
                if (size == this.buffer.size()) {
                    this.logger.warning("Terminating Publisher - did not to consume anything in the last '{}' seconds, buffer is still at '{}' outstanding messages", Integer.valueOf(MESSAGE_CONSUMPTION_CHECK_SECONDS), Long.valueOf(size));
                    context.stop(getSelf());
                } else {
                    this.currentlyInMessageConsumedCheck.set(false);
                    this.logger.info("Outstanding messages were consumed, Publisher is not terminated");
                }
            }, context.system().dispatcher());
        }
    }

    private void deliverBuf() {
        LogUtil.enhanceLogWithCorrelationId(this.logger, this.connectionCorrelationId);
        while (totalDemand() > 0) {
            if (totalDemand() <= 2147483647L) {
                List<Jsonifiable.WithPredicate<JsonObject, JsonField>> subList = this.buffer.subList(0, Math.min(this.buffer.size(), (int) totalDemand()));
                subList.forEach((v1) -> {
                    onNext(v1);
                });
                this.buffer.removeAll(subList);
                return;
            } else {
                List<Jsonifiable.WithPredicate<JsonObject, JsonField>> subList2 = this.buffer.subList(0, Math.min(this.buffer.size(), Integer.MAX_VALUE));
                subList2.forEach((v1) -> {
                    onNext(v1);
                });
                this.buffer.removeAll(subList2);
            }
        }
    }
}
