package com.mesosphere.usi.core;

import akka.Done;
import akka.NotUsed;
import akka.NotUsed$;
import akka.stream.BidiShape;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Materializer;
import akka.stream.UniformFanOutShape;
import akka.stream.scaladsl.BidiFlow;
import akka.stream.scaladsl.BidiFlow$;
import akka.stream.scaladsl.Broadcast$;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.GraphDSL;
import akka.stream.scaladsl.GraphDSL$;
import akka.stream.scaladsl.GraphDSL$Implicits$;
import akka.stream.scaladsl.GraphDSL$Implicits$PortOps;
import akka.stream.scaladsl.Sink;
import akka.stream.scaladsl.Source;
import com.mesosphere.mesos.client.MesosCalls;
import com.mesosphere.mesos.client.MesosClient;
import com.mesosphere.usi.core.conf.SchedulerSettings;
import com.mesosphere.usi.core.models.StateEvent;
import com.mesosphere.usi.core.models.StateSnapshot;
import com.mesosphere.usi.core.models.commands.SchedulerCommand;
import com.mesosphere.usi.metrics.Metrics;
import com.mesosphere.usi.repository.PodRecordRepository;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.scheduler.Protos;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.runtime.BoxesRunTime;

/* compiled from: Scheduler.scala */
/* loaded from: input_file:WEB-INF/lib/core-0.1.18.jar:com/mesosphere/usi/core/Scheduler$.class */
public final class Scheduler$ {
    public static Scheduler$ MODULE$;

    static {
        new Scheduler$();
    }

    public Future<Tuple3<StateSnapshot, Source<StateEvent, NotUsed>, Sink<SchedulerCommand, Future<Done>>>> asSourceAndSink(MesosClient mesosClient, PodRecordRepository podRecordRepository, Metrics metrics, SchedulerSettings schedulerSettings, Materializer materializer) {
        return fromClient(mesosClient, podRecordRepository, metrics, schedulerSettings).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            StateSnapshot stateSnapshot = (StateSnapshot) tuple2.mo5772_1();
            Tuple2 asSourceAndSink = FlowHelpers$.MODULE$.asSourceAndSink((Flow) tuple2.mo5771_2(), materializer);
            if (asSourceAndSink == null) {
                throw new MatchError(asSourceAndSink);
            }
            Tuple2 tuple2 = new Tuple2((Source) asSourceAndSink.mo5772_1(), (Sink) asSourceAndSink.mo5771_2());
            return new Tuple3(stateSnapshot, (Source) tuple2.mo5772_1(), (Sink) tuple2.mo5771_2());
        }, CallerThreadExecutionContext$.MODULE$.context());
    }

    public Future<Tuple2<StateSnapshot, Flow<SchedulerCommand, StateEvent, NotUsed>>> fromClient(MesosClient mesosClient, PodRecordRepository podRecordRepository, Metrics metrics, SchedulerSettings schedulerSettings) {
        if (isMultiRoleFramework(mesosClient.frameworkInfo())) {
            return fromFlow(mesosClient.calls(), podRecordRepository, metrics, Flow$.MODULE$.fromSinkAndSourceCoupled(mesosClient.mesosSink(), mesosClient.mesosSource()), schedulerSettings, mesosClient.masterInfo().getDomain());
        }
        throw new IllegalArgumentException("USI scheduler provides support for MULTI_ROLE frameworks only. Please provide a MesosClient with FrameworkInfo that has capability MULTI_ROLE");
    }

    public Future<Tuple2<StateSnapshot, Flow<SchedulerCommand, StateEvent, NotUsed>>> fromFlow(MesosCalls mesosCalls, PodRecordRepository podRecordRepository, Metrics metrics, Flow<Protos.Call, Protos.Event, Object> flow, SchedulerSettings schedulerSettings, Protos.DomainInfo domainInfo) {
        return unconnectedGraph(mesosCalls, podRecordRepository, metrics, schedulerSettings, domainInfo).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            StateSnapshot stateSnapshot = (StateSnapshot) tuple2.mo5772_1();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(stateSnapshot), Flow$.MODULE$.fromGraph(GraphDSL$.MODULE$.create((BidiFlow) tuple2.mo5771_2(), flow, (notUsed, obj) -> {
                return NotUsed$.MODULE$;
            }, builder -> {
                return (bidiShape, flowShape) -> {
                    GraphDSL$Implicits$.MODULE$.flow2flow(flowShape, builder).$tilde$greater(bidiShape.in2(), (GraphDSL.Builder<?>) builder);
                    GraphDSL$Implicits$.MODULE$.port2flow(bidiShape.out2(), builder).$tilde$greater(flowShape, (GraphDSL.Builder<?>) builder);
                    return new FlowShape(bidiShape.in1(), bidiShape.out1());
                };
            })));
        }, CallerThreadExecutionContext$.MODULE$.context());
    }

    public Future<Tuple2<StateSnapshot, BidiFlow<SchedulerCommand, StateEvent, Protos.Event, Protos.Call, NotUsed>>> unconnectedGraph(MesosCalls mesosCalls, PodRecordRepository podRecordRepository, Metrics metrics, SchedulerSettings schedulerSettings, Protos.DomainInfo domainInfo) {
        return podRecordRepository.readAll().map(map -> {
            StateSnapshot stateSnapshot = new StateSnapshot(map.values().toSeq(), Nil$.MODULE$);
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(stateSnapshot), BidiFlow$.MODULE$.fromGraph(GraphDSL$.MODULE$.create(new SchedulerLogicGraph(mesosCalls, domainInfo, stateSnapshot, metrics), builder -> {
                return fanInShape2 -> {
                    UniformFanOutShape uniformFanOutShape = (UniformFanOutShape) builder.add(Broadcast$.MODULE$.apply(2, true));
                    GraphDSL$Implicits$.MODULE$.port2flow(fanInShape2.out(), builder).$tilde$greater((FlowShape) builder.add(MODULE$.persistenceFlow(podRecordRepository, schedulerSettings)), (GraphDSL.Builder<?>) builder).$tilde$greater((Inlet) uniformFanOutShape.in(), (GraphDSL.Builder<?>) builder);
                    GraphDSL$Implicits$PortOps graphDSL$Implicits$PortOps = (GraphDSL$Implicits$PortOps) GraphDSL$Implicits$.MODULE$.port2flow(uniformFanOutShape.out(0), builder).mapConcat(schedulerEvents -> {
                        return schedulerEvents.mesosCalls();
                    });
                    return new BidiShape(fanInShape2.in0(), ((GraphDSL$Implicits$PortOps) GraphDSL$Implicits$.MODULE$.port2flow(uniformFanOutShape.out(1), builder).mapConcat(schedulerEvents2 -> {
                        return schedulerEvents2.stateEvents();
                    })).outlet(), fanInShape2.in1(), graphDSL$Implicits$PortOps.outlet());
                };
            })));
        }, ExecutionContext$.MODULE$.global());
    }

    public Flow<SchedulerEvents, SchedulerEvents, NotUsed> persistenceFlow(PodRecordRepository podRecordRepository, SchedulerSettings schedulerSettings) {
        return (Flow) Flow$.MODULE$.apply().mapConcat(schedulerEvents -> {
            return MODULE$.persistEvents(schedulerEvents, podRecordRepository);
        }).mapAsync(schedulerSettings.persistencePipelineLimit(), function0 -> {
            return (Future) function0.mo213apply();
        }).collect(new Scheduler$$anonfun$persistenceFlow$3());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Function0<Future<Option<SchedulerEvents>>>> persistEvents(SchedulerEvents schedulerEvents, PodRecordRepository podRecordRepository) {
        return (List) ((List) schedulerEvents.stateEvents().collect(new Scheduler$$anonfun$1(podRecordRepository), List$.MODULE$.canBuildFrom())).$colon$plus(() -> {
            return Future$.MODULE$.successful(new Some(schedulerEvents));
        }, List$.MODULE$.canBuildFrom());
    }

    private boolean isMultiRoleFramework(Protos.FrameworkInfo frameworkInfo) {
        return ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(frameworkInfo.getCapabilitiesList()).asScala()).exists(capability -> {
            return BoxesRunTime.boxToBoolean($anonfun$isMultiRoleFramework$1(capability));
        });
    }

    public static final /* synthetic */ boolean $anonfun$isMultiRoleFramework$1(Protos.FrameworkInfo.Capability capability) {
        Protos.FrameworkInfo.Capability.Type type = capability.getType();
        Protos.FrameworkInfo.Capability.Type type2 = Protos.FrameworkInfo.Capability.Type.MULTI_ROLE;
        return type != null ? type.equals(type2) : type2 == null;
    }

    private Scheduler$() {
        MODULE$ = this;
    }
}
