package io.mantisrx.server.worker.client;

import com.mantisrx.common.utils.MantisMetricStringConstants;
import com.mantisrx.common.utils.NettyUtils;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.core.ServiceRegistry;
import io.reactivx.mantis.operators.DropOperator;
import java.util.HashSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;

/* loaded from: input_file:io/mantisrx/server/worker/client/SseWorkerConnectionFunction.class */
public class SseWorkerConnectionFunction implements WorkerConnectionFunc<MantisServerSentEvent> {
    private static final String DEFAULT_BUFFER_SIZE_STR = "0";
    private static final MetricGroupId metricGroupId;
    private final boolean reconnectUponConnectionRest;
    private final Action1<Throwable> connectionResetHandler;
    private final SinkParameters sinkParameters;
    private final int bufferSize;
    private static final Logger logger = LoggerFactory.getLogger(SseWorkerConnectionFunction.class);
    private static final CopyOnWriteArraySet<MetricGroupId> metricsSet = new CopyOnWriteArraySet<>();
    private static final Action1<Throwable> defaultConxResetHandler = new Action1<Throwable>() { // from class: io.mantisrx.server.worker.client.SseWorkerConnectionFunction.1
        public void call(Throwable th) {
            SseWorkerConnectionFunction.logger.warn("Retrying reset connection");
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                SseWorkerConnectionFunction.logger.debug("Interrupted waiting for retrying connection");
            }
        }
    };

    public SseWorkerConnectionFunction(boolean z, Action1<Throwable> action1) {
        this(z, action1, null);
    }

    public SseWorkerConnectionFunction(boolean z, Action1<Throwable> action1, SinkParameters sinkParameters) {
        this.reconnectUponConnectionRest = z;
        this.connectionResetHandler = action1 == null ? defaultConxResetHandler : action1;
        this.sinkParameters = sinkParameters;
        this.bufferSize = Integer.parseInt(ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("workerClient.buffer.size", DEFAULT_BUFFER_SIZE_STR));
    }

    public WorkerConnection<MantisServerSentEvent> call(String str, Integer num) {
        return call(str, num, null, null, 5L);
    }

    @Override // io.mantisrx.server.worker.client.WorkerConnectionFunc
    public WorkerConnection<MantisServerSentEvent> call(final String str, final Integer num, final Action1<Boolean> action1, final Action1<Boolean> action12, final long j) {
        return new WorkerConnection<MantisServerSentEvent>() { // from class: io.mantisrx.server.worker.client.SseWorkerConnectionFunction.3
            private final SseWorkerConnection workerConn;

            {
                this.workerConn = new SseWorkerConnection("WorkerMetrics", str, num, action1, action12, SseWorkerConnectionFunction.this.connectionResetHandler, j, SseWorkerConnectionFunction.this.reconnectUponConnectionRest, SseWorkerConnectionFunction.metricsSet, SseWorkerConnectionFunction.this.bufferSize, SseWorkerConnectionFunction.this.sinkParameters, SseWorkerConnectionFunction.metricGroupId);
            }

            @Override // io.mantisrx.server.worker.client.WorkerConnection
            public String getName() {
                return this.workerConn.getName();
            }

            @Override // java.lang.AutoCloseable
            public void close() throws Exception {
                this.workerConn.close();
            }

            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public Observable<MantisServerSentEvent> m4call() {
                return this.workerConn.call();
            }
        };
    }

    static {
        NettyUtils.setNettyThreads();
        metricGroupId = new MetricGroupId(MantisMetricStringConstants.DROP_OPERATOR_INCOMING_METRIC_GROUP + "_SseWorkerMetricsConnectionFunction_withBuffer");
        metricsSet.add(metricGroupId);
        logger.info("SETTING UP METRICS PRINTER THREAD");
        new ScheduledThreadPoolExecutor(1).scheduleWithFixedDelay(new Runnable() { // from class: io.mantisrx.server.worker.client.SseWorkerConnectionFunction.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    HashSet<MetricGroupId> hashSet = new HashSet(SseWorkerConnectionFunction.metricsSet);
                    if (!hashSet.isEmpty()) {
                        for (MetricGroupId metricGroupId2 : hashSet) {
                            Metrics metric = MetricsRegistry.getInstance().getMetric(metricGroupId2);
                            if (metric != null) {
                                SseWorkerConnectionFunction.logger.info(metricGroupId2.id() + ": onNext=" + metric.getCounter("" + DropOperator.Counters.onNext).value() + ", onError=" + metric.getCounter("" + DropOperator.Counters.onError).value() + ", onComplete=" + metric.getCounter("" + DropOperator.Counters.onComplete).value() + ", dropped=" + metric.getCounter("" + DropOperator.Counters.dropped).value());
                            }
                        }
                    }
                } catch (Exception e) {
                    SseWorkerConnectionFunction.logger.warn("Unexpected error in metrics printer thread: " + e.getMessage(), e);
                }
            }
        }, 60L, 60L, TimeUnit.SECONDS);
    }
}
