package io.mantisrx.server.master.client;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.common.network.WorkerEndpoint;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.server.core.Configurations;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.NamedJobInfo;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerHost;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.ToDeltaEndpointInjector;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/mantisrx/server/master/client/MasterClientWrapper.class */
public class MasterClientWrapper {
    public static final String InvalidNamedJob = "No_such_named_job";
    private static final Logger logger = LoggerFactory.getLogger(MasterClientWrapper.class);
    private final MantisMasterGateway masterClientApi;
    private final PublishSubject<JobSinkNumWorkers> numSinkWorkersSubject = PublishSubject.create();
    private final PublishSubject<JobNumWorkers> numWorkersSubject = PublishSubject.create();
    private final Counter masterConnectRetryCounter = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().name(MasterClientWrapper.class.getCanonicalName()).addCounter("MasterConnectRetryCount").build()).getCounter("MasterConnectRetryCount");

    /* loaded from: input_file:io/mantisrx/server/master/client/MasterClientWrapper$JobNumWorkers.class */
    public static class JobNumWorkers {
        protected final int numWorkers;
        private final String jobId;

        public JobNumWorkers(String str, int i) {
            this.jobId = str;
            this.numWorkers = i;
        }

        public String getJobId() {
            return this.jobId;
        }

        public int getNumWorkers() {
            return this.numWorkers;
        }
    }

    /* loaded from: input_file:io/mantisrx/server/master/client/MasterClientWrapper$JobSinkNumWorkers.class */
    public static class JobSinkNumWorkers {
        protected final int numSinkWorkers;
        private final String jobId;

        public JobSinkNumWorkers(String str, int i) {
            this.jobId = str;
            this.numSinkWorkers = i;
        }

        public String getJobId() {
            return this.jobId;
        }

        public int getNumSinkWorkers() {
            return this.numSinkWorkers;
        }
    }

    public MasterClientWrapper(MantisMasterGateway mantisMasterGateway) {
        this.masterClientApi = mantisMasterGateway;
    }

    public static String getWrappedHost(String str, int i) {
        return str + "-" + i;
    }

    public static String getUnwrappedHost(String str) {
        int lastIndexOf = str.lastIndexOf(45);
        return lastIndexOf < 0 ? str : str.substring(0, lastIndexOf);
    }

    public static void main(String[] strArr) throws InterruptedException {
        Properties properties = new Properties();
        properties.put("mantis.zookeeper.connectString", "ec2-50-19-255-1.compute-1.amazonaws.com:2181,ec2-54-235-159-245.compute-1.amazonaws.com:2181,ec2-50-19-255-97.compute-1.amazonaws.com:2181,ec2-184-73-152-248.compute-1.amazonaws.com:2181,ec2-50-17-247-179.compute-1.amazonaws.com:2181");
        properties.put("mantis.zookeeper.leader.announcement.path", "/leader");
        properties.put("mantis.zookeeper.root", "/mantis/master");
        final String str = "GroupByIPNJ-12";
        final MasterClientWrapper masterClientWrapper = new MasterClientWrapper(HighAvailabilityServicesUtil.createHAServices((CoreConfiguration) Configurations.frmProperties(properties, CoreConfiguration.class)).getMasterClientApi());
        masterClientWrapper.getMasterClientApi().flatMap(new Func1<MantisMasterGateway, Observable<EndpointChange>>() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.1
            public Observable<EndpointChange> call(MantisMasterGateway mantisMasterGateway) {
                return mantisMasterGateway.getSinkStageNum(str).take(1).flatMap(new Func1<Integer, Observable<EndpointChange>>() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.1.1
                    public Observable<EndpointChange> call(Integer num) {
                        MasterClientWrapper.logger.info("Getting sink locations for " + str);
                        return masterClientWrapper.getSinkLocations(str, num.intValue(), 0, 0);
                    }
                });
            }
        }).toBlocking().subscribe(endpointChange -> {
            System.out.println("Endpoint Change -> " + endpointChange);
        });
        Thread.sleep(50000L);
    }

    public void addNumSinkWorkersObserver(Observer<JobSinkNumWorkers> observer) {
        this.numSinkWorkersSubject.subscribe(observer);
    }

    public void addNumWorkersObserver(Observer<JobNumWorkers> observer) {
        this.numWorkersSubject.subscribe(observer);
    }

    public Observable<MantisMasterGateway> getMasterClientApi() {
        return Observable.just(this.masterClientApi);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Endpoint> getAllNonJobMasterEndpoints(final String str, Map<Integer, WorkerAssignments> map) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (Map.Entry<Integer, WorkerAssignments> entry : map.entrySet()) {
            Integer key = entry.getKey();
            if (key.intValue() != 0) {
                WorkerAssignments value = entry.getValue();
                logger.info("job {} Creating endpoints conx from {} worker assignments for stage {}", new Object[]{str, Integer.valueOf(value.getHosts().size()), key});
                if (logger.isDebugEnabled()) {
                    logger.debug("stage {} hosts: {}", key, value.getHosts());
                }
                i += value.getNumWorkers();
                for (WorkerHost workerHost : value.getHosts().values()) {
                    final int workerIndex = workerHost.getWorkerIndex();
                    if (workerHost.getState() == MantisJobState.Started) {
                        logger.info("job " + str + ": creating new endpoint for worker number=" + workerHost.getWorkerNumber() + ", index=" + workerHost.getWorkerIndex() + ", host:port=" + workerHost.getHost() + ":" + workerHost.getPort().get(0));
                        arrayList.add(new WorkerEndpoint(getWrappedHost(workerHost.getHost(), workerHost.getWorkerNumber()), ((Integer) workerHost.getPort().get(0)).intValue(), key.intValue(), workerHost.getMetricsPort(), workerHost.getWorkerIndex(), workerHost.getWorkerNumber(), new Action0() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.2
                            public void call() {
                                MasterClientWrapper.logger.info("job " + str + " WorkerIndex " + workerIndex + " completed");
                            }
                        }, new Action1<Throwable>() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.3
                            public void call(Throwable th) {
                                MasterClientWrapper.logger.info("job " + str + " WorkerIndex " + workerIndex + " failed");
                            }
                        }));
                    }
                }
            }
        }
        this.numWorkersSubject.onNext(new JobNumWorkers(str, i));
        return arrayList;
    }

    public Observable<EndpointChange> getAllWorkerMetricLocations(final String str) {
        return new ToDeltaEndpointInjector(this.masterClientApi.schedulingChanges(str).doOnError(new Action1<Throwable>() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.8
            public void call(Throwable th) {
                MasterClientWrapper.logger.warn("Error on scheduling changes observable: " + th);
            }
        }).retryWhen(new ConditionalRetry(this.masterConnectRetryCounter, "AllSchedInfoRetry", 10).getRetryLogic()).map(new Func1<JobSchedulingInfo, Map<Integer, WorkerAssignments>>() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.7
            public Map<Integer, WorkerAssignments> call(JobSchedulingInfo jobSchedulingInfo) {
                MasterClientWrapper.logger.info("Got scheduling info for " + str);
                return jobSchedulingInfo.getWorkerAssignments();
            }
        }).filter(new Func1<Map<Integer, WorkerAssignments>, Boolean>() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.6
            public Boolean call(Map<Integer, WorkerAssignments> map) {
                return Boolean.valueOf(map != null);
            }
        }).map(new Func1<Map<Integer, WorkerAssignments>, List<Endpoint>>() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.5
            public List<Endpoint> call(Map<Integer, WorkerAssignments> map) {
                return MasterClientWrapper.this.getAllNonJobMasterEndpoints(str, map);
            }
        }).doOnError(new Action1<Throwable>() { // from class: io.mantisrx.server.master.client.MasterClientWrapper.4
            public void call(Throwable th) {
                MasterClientWrapper.logger.error(th.getMessage(), th);
            }
        })).deltas();
    }

    public Observable<EndpointChange> getSinkLocations(String str, int i, int i2, int i3) {
        return new ToDeltaEndpointInjector(this.masterClientApi.schedulingChanges(str).doOnError(th -> {
            logger.warn(th.getMessage());
        }).retryWhen(new ConditionalRetry(this.masterConnectRetryCounter, "SchedInfoRetry", 10).getRetryLogic()).map(jobSchedulingInfo -> {
            logger.info("Got scheduling info for {}", str);
            if (logger.isDebugEnabled()) {
                logger.debug("Worker Assignments {}", jobSchedulingInfo.getWorkerAssignments().get(Integer.valueOf(i)));
            }
            return (WorkerAssignments) jobSchedulingInfo.getWorkerAssignments().get(Integer.valueOf(i));
        }).map(workerAssignments -> {
            ArrayList arrayList = new ArrayList();
            if (workerAssignments != null) {
                logger.info("job " + str + " Creating endpoints conx from " + workerAssignments.getHosts().size() + " worker assignments");
                for (WorkerHost workerHost : workerAssignments.getHosts().values()) {
                    int workerIndex = workerHost.getWorkerIndex();
                    int numWorkers = workerAssignments.getNumWorkers();
                    this.numSinkWorkersSubject.onNext(new JobSinkNumWorkers(str, numWorkers));
                    if (usePartition(workerIndex, numWorkers, i2, i3) && workerHost.getState() == MantisJobState.Started) {
                        arrayList.add(new Endpoint(getWrappedHost(workerHost.getHost(), workerHost.getWorkerNumber()), ((Integer) workerHost.getPort().get(0)).intValue(), () -> {
                            logger.info("job " + str + " WorkerIndex " + workerIndex + " completed");
                        }, th2 -> {
                            logger.info("job " + str + " WorkerIndex " + workerIndex + " failed");
                        }));
                    }
                }
            } else {
                logger.info("job " + str + " Has no active workers!");
            }
            return arrayList;
        }).doOnError(th2 -> {
            logger.error(th2.getMessage(), th2);
        })).deltas();
    }

    private boolean usePartition(int i, int i2, int i3, int i4) {
        if (i3 < 0 || i4 == 0) {
            return true;
        }
        long round = Math.round(i2 / i4);
        long j = i3 * round;
        return j < ((long) i2) && ((long) i) >= j && ((long) i) < (i3 == i4 - 1 ? (long) i2 : ((long) (i3 + 1)) * round);
    }

    public Observable<Boolean> namedJobExists(String str) {
        ConditionalRetry conditionalRetry = new ConditionalRetry(this.masterConnectRetryCounter, "NamedJobExists", Integer.MAX_VALUE);
        logger.info("verifying if job name exists: " + str);
        return this.masterClientApi.namedJobExists(str).retryWhen(conditionalRetry.getRetryLogic());
    }

    public Observable<String> getNamedJobsIds(String str) {
        ConditionalRetry conditionalRetry = new ConditionalRetry(this.masterConnectRetryCounter, "NamedJobsIds", Integer.MAX_VALUE);
        logger.info("verifying if job name exists: " + str);
        return this.masterClientApi.namedJobExists(str).onErrorResumeNext(th -> {
            logger.error(th.getMessage());
            return Observable.empty();
        }).take(1).map(bool -> {
            if (bool.booleanValue()) {
                logger.info("Getting Job cluster info for " + str);
                return this.masterClientApi.namedJobInfo(str);
            }
            conditionalRetry.setErrorRef(new Exception("No such Job Cluster " + str));
            return Observable.just(new NamedJobInfo(str, InvalidNamedJob));
        }).doOnError(th2 -> {
            logger.error(th2.getMessage(), th2);
        }).retryWhen(conditionalRetry.getRetryLogic()).flatMap(observable -> {
            return observable.map(namedJobInfo -> {
                return namedJobInfo.getJobId();
            });
        });
    }
}
