package io.mantisrx.server.worker.client;

import com.mantisrx.common.utils.MantisMetricStringConstants;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.compression.CompressionUtils;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.runtime.parameter.SinkParameter;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.netty.buffer.ByteBuf;
import io.reactivx.mantis.operators.DropOperator;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.client.HttpClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/mantisrx/server/worker/client/SseWorkerConnection.class */
public class SseWorkerConnection {
    private static final Logger logger = LoggerFactory.getLogger(SseWorkerConnection.class);
    private static final String metricNamePrefix = MantisMetricStringConstants.DROP_OPERATOR_INCOMING_METRIC_GROUP;
    protected final PublishSubject<Boolean> shutdownSubject;
    final AtomicLong lastDataReceived;
    private final String connectionType;
    private final String hostname;
    private final int port;
    private final MetricGroupId metricGroupId;
    private final Counter pingCounter;
    private final boolean reconnectUponConnectionReset;
    private final Action1<Boolean> updateConxStatus;
    private final Action1<Boolean> updateDataRecvngStatus;
    private final Action1<Throwable> connectionResetHandler;
    private final long dataRecvTimeoutSecs;
    private final CopyOnWriteArraySet<MetricGroupId> metricsSet;
    private final int bufferSize;
    private final SinkParameters sinkParameters;
    private final boolean disablePingFiltering;
    private final AtomicBoolean isConnected;
    private final AtomicBoolean isReceivingData;
    HttpClient<ByteBuf, ServerSentEvent> client;
    private boolean compressedBinaryInputEnabled;
    private volatile boolean isShutdown;
    private final Func1<Observable<? extends Throwable>, Observable<?>> retryLogic;
    private long lastDataDropValue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mantisrx/server/worker/client/SseWorkerConnection$ErrorType.class */
    public enum ErrorType {
        Retryable,
        Unknown
    }

    /* loaded from: input_file:io/mantisrx/server/worker/client/SseWorkerConnection$SseException.class */
    private static class SseException extends RuntimeException {
        private final ErrorType type;

        private SseException(ErrorType errorType, String str) {
            super(errorType + ": " + str);
            this.type = errorType;
        }

        private SseException(ErrorType errorType, String str, Throwable th) {
            super(errorType + ": " + str, th);
            this.type = errorType;
        }
    }

    public SseWorkerConnection(String str, String str2, Integer num, Action1<Boolean> action1, Action1<Boolean> action12, Action1<Throwable> action13, long j, boolean z, CopyOnWriteArraySet<MetricGroupId> copyOnWriteArraySet, int i, SinkParameters sinkParameters, MetricGroupId metricGroupId) {
        this(str, str2, num, action1, action12, action13, j, z, copyOnWriteArraySet, i, sinkParameters, false, metricGroupId);
    }

    public SseWorkerConnection(String str, String str2, Integer num, Action1<Boolean> action1, Action1<Boolean> action12, Action1<Throwable> action13, long j, boolean z, CopyOnWriteArraySet<MetricGroupId> copyOnWriteArraySet, int i, SinkParameters sinkParameters, boolean z2, MetricGroupId metricGroupId) {
        this.shutdownSubject = PublishSubject.create();
        this.lastDataReceived = new AtomicLong(System.currentTimeMillis());
        this.isConnected = new AtomicBoolean(false);
        this.isReceivingData = new AtomicBoolean(false);
        this.compressedBinaryInputEnabled = false;
        this.isShutdown = false;
        this.retryLogic = new Func1<Observable<? extends Throwable>, Observable<?>>() { // from class: io.mantisrx.server.worker.client.SseWorkerConnection.1
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return !SseWorkerConnection.this.reconnectUponConnectionReset ? Observable.empty() : observable.zipWith(Observable.range(1, Integer.MAX_VALUE), new Func2<Throwable, Integer, Integer>() { // from class: io.mantisrx.server.worker.client.SseWorkerConnection.1.2
                    public Integer call(Throwable th, Integer num2) {
                        return num2;
                    }
                }).flatMap(new Func1<Integer, Observable<?>>() { // from class: io.mantisrx.server.worker.client.SseWorkerConnection.1.1
                    public Observable<?> call(Integer num2) {
                        if (SseWorkerConnection.this.isShutdown) {
                            SseWorkerConnection.logger.info(SseWorkerConnection.this.getName() + ": Is shutdown, stopping retries");
                            return Observable.empty();
                        }
                        long intValue = 2 * (num2.intValue() > 10 ? 10 : num2.intValue());
                        SseWorkerConnection.logger.info(SseWorkerConnection.this.getName() + ": retrying conx after sleeping for " + intValue + " secs");
                        return Observable.timer(intValue, TimeUnit.SECONDS);
                    }
                });
            }
        };
        this.lastDataDropValue = 0L;
        this.connectionType = str;
        this.hostname = str2;
        this.port = num.intValue();
        this.metricGroupId = metricGroupId;
        this.pingCounter = new Metrics.Builder().id(new MetricGroupId("ConnectionHealth")).addCounter("pingCount").build().getCounter("pingCount");
        this.updateConxStatus = action1;
        this.updateDataRecvngStatus = action12;
        this.connectionResetHandler = action13;
        this.dataRecvTimeoutSecs = j;
        this.reconnectUponConnectionReset = z;
        this.metricsSet = copyOnWriteArraySet;
        this.bufferSize = i;
        this.sinkParameters = sinkParameters;
        if (this.sinkParameters != null) {
            this.compressedBinaryInputEnabled = isCompressedBinaryInputEnabled(this.sinkParameters.getSinkParams());
        }
        this.disablePingFiltering = z2;
    }

    private boolean isCompressedBinaryInputEnabled(List<SinkParameter> list) {
        for (SinkParameter sinkParameter : list) {
            if ("mantis.EnableCompressedBinary".equals(sinkParameter.getName()) && "true".equalsIgnoreCase(sinkParameter.getValue())) {
                return true;
            }
        }
        return false;
    }

    public String getName() {
        return "Sse" + this.connectionType + "Connection: " + this.hostname + ":" + this.port;
    }

    public synchronized void close() throws Exception {
        if (this.isShutdown) {
            return;
        }
        logger.info("Closing sse connection to " + this.hostname + ":" + this.port);
        this.shutdownSubject.onNext(true);
        this.shutdownSubject.onCompleted();
        this.isShutdown = true;
        resetConnected();
    }

    public synchronized Observable<MantisServerSentEvent> call() {
        if (this.isShutdown) {
            return Observable.empty();
        }
        this.client = RxNetty.newHttpClientBuilder(this.hostname, this.port).pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()).withNoConnectionPooling().build();
        StringBuilder sb = new StringBuilder();
        if (this.sinkParameters != null) {
            sb.append(this.sinkParameters.toString());
        }
        sb.append(sb.length() == 0 ? getDefaultSinkParams("?") : getDefaultSinkParams("&"));
        String str = "/" + sb.toString();
        logger.info(getName() + ": Using uri: " + str);
        return this.client.submit(HttpClientRequest.createGet(str)).takeUntil(this.shutdownSubject).takeWhile(httpClientResponse -> {
            return Boolean.valueOf(!this.isShutdown);
        }).filter(httpClientResponse2 -> {
            if (!httpClientResponse2.getStatus().reasonPhrase().equals("OK")) {
                logger.warn(getName() + ":Trying to continue after unexpected response from sink: " + httpClientResponse2.getStatus().reasonPhrase());
            }
            return Boolean.valueOf(httpClientResponse2.getStatus().reasonPhrase().equals("OK"));
        }).flatMap(httpClientResponse3 -> {
            if (!this.isConnected.getAndSet(true) && this.updateConxStatus != null) {
                this.updateConxStatus.call(true);
            }
            return streamContent(httpClientResponse3, this.updateDataRecvngStatus, this.dataRecvTimeoutSecs);
        }).doOnError(th -> {
            resetConnected();
            logger.warn(getName() + "Error on getting response from SSE server: " + th.getMessage());
            this.connectionResetHandler.call(th);
        }).retryWhen(this.retryLogic).doOnCompleted(this::resetConnected);
    }

    private void resetConnected() {
        if (this.isConnected.getAndSet(false) && this.updateConxStatus != null) {
            this.updateConxStatus.call(false);
        }
        if (!this.isReceivingData.compareAndSet(true, false) || this.updateDataRecvngStatus == null) {
            return;
        }
        synchronized (this.updateDataRecvngStatus) {
            this.updateDataRecvngStatus.call(false);
        }
    }

    private Observable<MantisServerSentEvent> streamContent(HttpClientResponse<ServerSentEvent> httpClientResponse, Action1<Boolean> action1, long j) {
        long max = Math.max(1L, j / 2);
        if (action1 != null) {
            Observable.interval(max, max, TimeUnit.SECONDS).doOnNext(l -> {
                if (this.isShutdown) {
                    return;
                }
                if (hasDataDrop() || System.currentTimeMillis() > this.lastDataReceived.get() + (j * 1000)) {
                    if (this.isReceivingData.compareAndSet(true, false)) {
                        synchronized (action1) {
                            action1.call(false);
                        }
                        return;
                    }
                    return;
                }
                if (this.isConnected.get() && this.isReceivingData.compareAndSet(false, true)) {
                    synchronized (action1) {
                        action1.call(true);
                    }
                }
            }).takeUntil(this.shutdownSubject).takeWhile(l2 -> {
                return Boolean.valueOf(!this.isShutdown);
            }).doOnCompleted(() -> {
                if (this.isReceivingData.compareAndSet(true, false)) {
                    synchronized (action1) {
                        action1.call(false);
                    }
                }
            }).subscribe();
        }
        return httpClientResponse.getContent().lift(new DropOperator(this.metricGroupId)).flatMap(serverSentEvent -> {
            this.lastDataReceived.set(System.currentTimeMillis());
            if (this.isConnected.get() && this.isReceivingData.compareAndSet(false, true) && action1 != null) {
                synchronized (action1) {
                    action1.call(true);
                }
            }
            return (serverSentEvent.hasEventType() && serverSentEvent.getEventTypeAsString().startsWith("error:")) ? Observable.error(new SseException(ErrorType.Retryable, "Got error SSE event: " + serverSentEvent.contentAsString())) : Observable.just(serverSentEvent.contentAsString());
        }).filter(str -> {
            if (!str.startsWith("ping")) {
                return true;
            }
            this.pingCounter.increment();
            return Boolean.valueOf(this.disablePingFiltering);
        }).flatMapIterable(str2 -> {
            return CompressionUtils.decompressAndBase64Decode(str2, this.compressedBinaryInputEnabled, true);
        }).takeUntil(this.shutdownSubject).takeWhile(mantisServerSentEvent -> {
            return Boolean.valueOf(!this.isShutdown);
        });
    }

    private boolean hasDataDrop() {
        Collection<Metrics> metrics = MetricsRegistry.getInstance().getMetrics(metricNamePrefix);
        long j = 0;
        if (metrics != null && !metrics.isEmpty()) {
            for (Metrics metrics2 : metrics) {
                Counter counter = metrics2.getCounter("" + DropOperator.Counters.dropped);
                metrics2.getCounter("" + DropOperator.Counters.onNext);
                if (counter != null) {
                    j += counter.value();
                }
            }
        }
        if (j <= this.lastDataDropValue) {
            return false;
        }
        this.lastDataDropValue = j;
        return true;
    }

    private String getDefaultSinkParams(String str) {
        String str2 = System.getenv("JOB_ID");
        String str3 = System.getenv("WORKER_INDEX");
        String str4 = System.getenv("WORKER_NUMBER");
        return (str2 == null || str2.isEmpty() || str3 == null || str3.isEmpty() || str4 == null || str4.isEmpty()) ? "" : str + "groupId=" + str2 + "&slotId=" + str3 + "&id=" + str4;
    }
}
