package org.eclipse.ditto.services.utils.health;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonFieldMarker;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.services.utils.akka.actors.ModifyConfigBehavior;
import org.eclipse.ditto.services.utils.akka.actors.RetrieveConfigBehavior;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.config.DittoConfigError;
import org.eclipse.ditto.services.utils.health.StatusDetailMessage;
import org.eclipse.ditto.services.utils.health.StatusInfo;
import org.eclipse.ditto.services.utils.health.config.BackgroundStreamingConfig;
import org.eclipse.ditto.signals.commands.common.Shutdown;
import org.eclipse.ditto.signals.commands.common.ShutdownResponse;

/* loaded from: input_file:org/eclipse/ditto/services/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.class */
public abstract class AbstractBackgroundStreamingActorWithConfigWithStatusReport<C extends BackgroundStreamingConfig> extends AbstractActorWithTimers implements RetrieveConfigBehavior, ModifyConfigBehavior {
    protected C config;
    protected final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    protected final ActorMaterializer materializer = ActorMaterializer.create(getContext());

    @Nullable
    private final Deque<Pair<Instant, Event>> events;
    private KillSwitch killSwitch;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/eclipse/ditto/services/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport$Event.class */
    public interface Event {
        String name();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport$JsonFields.class */
    public static final class JsonFields {
        private static final JsonFieldDefinition<Boolean> ENABLED = JsonFactory.newBooleanFieldDefinition("enabled", new JsonFieldMarker[0]);
        private static final JsonFieldDefinition<JsonArray> EVENTS = JsonFactory.newJsonArrayFieldDefinition("events", new JsonFieldMarker[0]);

        private JsonFields() {
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport$StreamTerminated.class */
    private static final class StreamTerminated implements Event {
        private final String whatHappened;

        private StreamTerminated(String str) {
            this.whatHappened = str;
        }

        @Override // org.eclipse.ditto.services.utils.health.AbstractBackgroundStreamingActorWithConfigWithStatusReport.Event
        public String name() {
            return this.whatHappened;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport$WokeUp.class */
    public static final class WokeUp implements Event {
        private static final WokeUp ENABLED = new WokeUp(true);
        private final boolean enabled;

        private WokeUp(boolean z) {
            this.enabled = z;
        }

        private WokeUp enable(boolean z) {
            return new WokeUp(z);
        }

        @Override // org.eclipse.ditto.services.utils.health.AbstractBackgroundStreamingActorWithConfigWithStatusReport.Event
        public String name() {
            return this.enabled ? "WOKE_UP" : "Not waking up: I am disabled.";
        }
    }

    protected AbstractBackgroundStreamingActorWithConfigWithStatusReport(C c) {
        this.config = c;
        this.events = new ArrayDeque(c.getKeptEvents() + 1);
        if (c.isEnabled()) {
            scheduleWakeUp();
        }
    }

    protected static <T> void enqueue(Deque<Pair<Instant, T>> deque, T t, int i) {
        deque.addFirst(Pair.create(Instant.now(), t));
        if (deque.size() > i) {
            deque.removeLast();
        }
    }

    protected abstract C parseConfig(Config config);

    protected abstract Source<?, ?> getSource();

    protected void preEnhanceSleepingBehavior(ReceiveBuilder receiveBuilder) {
    }

    protected void preEnhanceStreamingBehavior(ReceiveBuilder receiveBuilder) {
    }

    protected void postEnhanceStatusReport(JsonObjectBuilder jsonObjectBuilder) {
    }

    public AbstractActor.Receive createReceive() {
        return sleeping();
    }

    public Config getConfig() {
        return this.config.getConfig();
    }

    public Config setConfig(Config config) {
        C c = this.config;
        try {
            this.config = parseConfig(config.withFallback(ConfigFactory.parseString(getConfig().root().render(ConfigRenderOptions.concise()))));
        } catch (DittoConfigError | ConfigException e) {
            this.log.error(e, "Failed to set config");
        }
        if (!c.isEnabled() && this.config.isEnabled()) {
            scheduleWakeUp();
        }
        return this.config.getConfig();
    }

    private AbstractActor.Receive sleeping() {
        ReceiveBuilder create = ReceiveBuilder.create();
        preEnhanceSleepingBehavior(create);
        return create.match(WokeUp.class, this::wokeUp).match(Event.class, this::addCustomEventToLog).match(RetrieveHealth.class, this::retrieveHealth).match(Shutdown.class, this::shutdownStream).build().orElse(retrieveConfigBehavior()).orElse(modifyConfigBehavior());
    }

    private AbstractActor.Receive streaming() {
        ReceiveBuilder create = ReceiveBuilder.create();
        preEnhanceStreamingBehavior(create);
        return create.match(StreamTerminated.class, (v1) -> {
            streamTerminated(v1);
        }).match(Event.class, this::addCustomEventToLog).match(RetrieveHealth.class, this::retrieveHealth).match(Shutdown.class, this::shutdownStream).build().orElse(retrieveConfigBehavior()).orElse(modifyConfigBehavior());
    }

    private void wokeUp(WokeUp wokeUp) {
        this.log.info("Woke up.");
        enqueue(this.events, wokeUp.enable(this.config.isEnabled()), this.config.getKeptEvents());
        if (!this.config.isEnabled()) {
            this.log.warning("Not waking up because disabled.");
        } else {
            restartStream();
            getContext().become(streaming());
        }
    }

    protected void streamTerminated(Event event) {
        enqueue(this.events, event, this.config.getKeptEvents());
        if (this.config.isEnabled()) {
            this.log.info("Stream terminated. Will restart after quiet period.");
            scheduleWakeUp();
        } else {
            this.log.warning("Stream terminated while disabled.");
        }
        getContext().become(sleeping());
    }

    private void scheduleWakeUp() {
        scheduleWakeUp(this.config.getQuietPeriod());
    }

    private void scheduleWakeUp(Duration duration) {
        getTimers().startSingleTimer(WokeUp.class, WokeUp.ENABLED, duration);
    }

    private void shutdownStream(Shutdown shutdown) {
        this.log.info("Terminating stream on demand: <{}>", shutdown);
        shutdownKillSwitch();
        enqueue(this.events, new StreamTerminated("Got " + shutdown), this.config.getKeptEvents());
        getContext().become(sleeping());
        if (!this.config.isEnabled()) {
            getSender().tell(ShutdownResponse.of("Not restarting stream because I am disabled.", shutdown.getDittoHeaders()), getSelf());
            return;
        }
        Duration quietPeriod = this.config.getQuietPeriod();
        String format = String.format("Restarting in <%s>.", quietPeriod);
        scheduleWakeUp(quietPeriod);
        getSender().tell(ShutdownResponse.of(format, shutdown.getDittoHeaders()), getSelf());
    }

    private void addCustomEventToLog(Event event) {
        enqueue(this.events, event, this.config.getKeptEvents());
    }

    private void restartStream() {
        shutdownKillSwitch();
        Pair pair = (Pair) getSource().viaMat(KillSwitches.single(), Keep.right()).toMat(Sink.ignore(), Keep.both()).run(this.materializer);
        this.killSwitch = (KillSwitch) pair.first();
        ((CompletionStage) pair.second()).handle((done, th) -> {
            String format = String.format("Stream terminated. Result=<%s> Error=<%s>", done, th);
            this.log.info(format);
            getSelf().tell(new StreamTerminated(format), getSelf());
            return null;
        });
    }

    private void shutdownKillSwitch() {
        if (this.killSwitch != null) {
            this.killSwitch.shutdown();
            this.killSwitch = null;
        }
    }

    private void retrieveHealth(RetrieveHealth retrieveHealth) {
        getSender().tell(RetrieveHealthResponse.of(renderStatusInfo(), retrieveHealth.getDittoHeaders()), getSelf());
    }

    private StatusInfo renderStatusInfo() {
        return StatusInfo.fromStatus(StatusInfo.Status.UP, Collections.singletonList(StatusDetailMessage.of(StatusDetailMessage.Level.INFO, (JsonValue) render())));
    }

    private JsonObject render() {
        JsonObjectBuilder jsonObjectBuilder = JsonObject.newBuilder().set(JsonFields.ENABLED, Boolean.valueOf(this.config.isEnabled())).set(JsonFields.EVENTS, (JsonArray) this.events.stream().map(AbstractBackgroundStreamingActorWithConfigWithStatusReport::renderEvent).collect(JsonCollectors.valuesToArray()));
        postEnhanceStatusReport(jsonObjectBuilder);
        return jsonObjectBuilder.build();
    }

    private static JsonObject renderEvent(Pair<Instant, Event> pair) {
        return JsonObject.newBuilder().set(((Instant) pair.first()).toString(), ((Event) pair.second()).name()).build();
    }
}
