package radargun.lib.teetime.framework.scheduling.pushpullmodel;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import radargun.lib.org.slf4j.Logger;
import radargun.lib.org.slf4j.LoggerFactory;
import radargun.lib.teetime.framework.AbstractStage;
import radargun.lib.teetime.framework.Configuration;
import radargun.lib.teetime.framework.ConfigurationFacade;
import radargun.lib.teetime.framework.StageFacade;
import radargun.lib.teetime.framework.TeeTimeService;
import radargun.lib.teetime.framework.TerminationStrategy;
import radargun.lib.teetime.framework.Traverser;
import radargun.lib.teetime.framework.scheduling.CountDownAndUpLatch;
import radargun.lib.teetime.framework.signal.ValidatingSignal;
import radargun.lib.teetime.framework.validation.AnalysisNotValidException;

/* loaded from: input_file:libs/de/cau/se/radargun-2.0.0-SNAPSHOT.jar:radargun/lib/teetime/framework/scheduling/pushpullmodel/PushPullScheduling.class */
public class PushPullScheduling implements TeeTimeService, ThreadListener {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PushPullScheduling.class);
    private static final StageFacade STAGE_FACADE = StageFacade.INSTANCE;
    private static final ConfigurationFacade CONFIG_FACADE = ConfigurationFacade.INSTANCE;
    private final Configuration configuration;
    private final List<Thread> consumerThreads = Collections.synchronizedList(new LinkedList());
    private final List<Thread> finiteProducerThreads = Collections.synchronizedList(new LinkedList());
    private final List<Thread> infiniteProducerThreads = Collections.synchronizedList(new LinkedList());
    private final Set<AbstractStage> threadableStages = Collections.synchronizedSet(new HashSet());
    private final CountDownAndUpLatch numRunningFiniteProducers = new CountDownAndUpLatch();
    private final CountDownAndUpLatch numRunningConsumers = new CountDownAndUpLatch();

    public PushPullScheduling(Configuration configuration) {
        this.configuration = configuration;
    }

    @Override // radargun.lib.teetime.framework.TeeTimeService
    public void onInitialize() {
        startThreads(initialize(CONFIG_FACADE.getStartStages(this.configuration)));
    }

    @Override // radargun.lib.teetime.framework.TeeTimeService
    public void startStageAtRuntime(AbstractStage abstractStage) {
        abstractStage.declareActive();
        Set<AbstractStage> initialize = initialize(Arrays.asList(abstractStage));
        startThreads(initialize);
        if (abstractStage.isProducer()) {
            ValidatingSignal validatingSignal = new ValidatingSignal();
            abstractStage.onSignal(validatingSignal, null);
            if (validatingSignal.getInvalidPortConnections().size() > 0) {
                throw new AnalysisNotValidException(validatingSignal.getInvalidPortConnections());
            }
            sendStartingSignal(initialize);
        }
    }

    private Set<AbstractStage> initialize(Collection<AbstractStage> collection) {
        if (collection.isEmpty()) {
            throw new IllegalStateException("The start stage may not be null.");
        }
        A1ThreadableStageCollector a1ThreadableStageCollector = new A1ThreadableStageCollector();
        Traverser traverser = new Traverser(a1ThreadableStageCollector);
        Iterator<AbstractStage> it = collection.iterator();
        while (it.hasNext()) {
            traverser.traverse(it.next());
        }
        Set<AbstractStage> threadableStages = a1ThreadableStageCollector.getThreadableStages();
        this.threadableStages.addAll(threadableStages);
        if (this.threadableStages.isEmpty()) {
            throw new IllegalStateException("1004 - No threadable stages in this configuration.");
        }
        new A2InvalidThreadAssignmentCheck(threadableStages).check();
        Traverser traverser2 = new Traverser(new A3PipeInstantiation());
        Iterator<AbstractStage> it2 = collection.iterator();
        while (it2.hasNext()) {
            traverser2.traverse(it2.next());
        }
        new A4StageAttributeSetter(this.configuration, threadableStages, this).setAttributes();
        Iterator<AbstractStage> it3 = threadableStages.iterator();
        while (it3.hasNext()) {
            categorizeThreadableStage(it3.next());
        }
        return threadableStages;
    }

    private void categorizeThreadableStage(AbstractStage abstractStage) {
        TerminationStrategy terminationStrategy = STAGE_FACADE.getTerminationStrategy(abstractStage);
        switch (terminationStrategy) {
            case BY_INTERRUPT:
                this.infiniteProducerThreads.add(STAGE_FACADE.getOwningThread(abstractStage));
                return;
            case BY_SELF_DECISION:
                this.finiteProducerThreads.add(STAGE_FACADE.getOwningThread(abstractStage));
                return;
            case BY_SIGNAL:
                this.consumerThreads.add(STAGE_FACADE.getOwningThread(abstractStage));
                return;
            default:
                LOGGER.warn("Unknown termination strategy '{}' in stage {}", terminationStrategy, abstractStage);
                return;
        }
    }

    private void startThreads(Set<AbstractStage> set) {
        Iterator<AbstractStage> it = set.iterator();
        while (it.hasNext()) {
            TeeTimeThread teeTimeThread = (TeeTimeThread) STAGE_FACADE.getOwningThread(it.next());
            teeTimeThread.setListener(this);
            teeTimeThread.start();
        }
    }

    private void sendStartingSignal(Set<AbstractStage> set) {
        synchronized (set) {
            Iterator<AbstractStage> it = set.iterator();
            while (it.hasNext()) {
                ((TeeTimeThread) STAGE_FACADE.getOwningThread(it.next())).sendStartingSignal();
            }
        }
    }

    @Override // radargun.lib.teetime.framework.TeeTimeService
    public void onValidate() {
        for (AbstractStage abstractStage : this.threadableStages) {
            ValidatingSignal validatingSignal = new ValidatingSignal();
            abstractStage.onSignal(validatingSignal, null);
            if (validatingSignal.getInvalidPortConnections().size() > 0) {
                throw new AnalysisNotValidException(validatingSignal.getInvalidPortConnections());
            }
        }
    }

    @Override // radargun.lib.teetime.framework.TeeTimeService
    public void onExecute() {
        sendStartingSignal(this.threadableStages);
    }

    @Override // radargun.lib.teetime.framework.TeeTimeService
    public void onTerminate() {
        abortStages(this.threadableStages);
    }

    private void abortStages(Set<AbstractStage> set) {
        synchronized (set) {
            Iterator<AbstractStage> it = set.iterator();
            while (it.hasNext()) {
                STAGE_FACADE.abort(it.next());
            }
        }
    }

    @Override // radargun.lib.teetime.framework.TeeTimeService
    public void onFinish() {
        try {
            this.numRunningFiniteProducers.await();
        } catch (InterruptedException e) {
            LOGGER.error("Execution has stopped unexpectedly", (Throwable) e);
            Iterator<Thread> it = this.finiteProducerThreads.iterator();
            while (it.hasNext()) {
                it.next().interrupt();
            }
        }
        if (!this.infiniteProducerThreads.isEmpty()) {
            LOGGER.debug("Interrupting infiniteProducerThreads...");
            Iterator<Thread> it2 = this.infiniteProducerThreads.iterator();
            while (it2.hasNext()) {
                it2.next().interrupt();
            }
            LOGGER.debug("infiniteProducerThreads have been terminated");
        }
        try {
            this.numRunningConsumers.await();
        } catch (InterruptedException e2) {
            LOGGER.error("Execution has stopped unexpectedly", (Throwable) e2);
            synchronized (this.consumerThreads) {
                Iterator<Thread> it3 = this.consumerThreads.iterator();
                while (it3.hasNext()) {
                    it3.next().interrupt();
                }
            }
        }
    }

    @Override // radargun.lib.teetime.framework.scheduling.pushpullmodel.ThreadListener
    public void onBeforeStart(AbstractStage abstractStage) {
        switch (STAGE_FACADE.getTerminationStrategy(abstractStage)) {
            case BY_SELF_DECISION:
                this.numRunningFiniteProducers.countUp();
                return;
            case BY_SIGNAL:
                this.numRunningConsumers.countUp();
                return;
            default:
                return;
        }
    }

    @Override // radargun.lib.teetime.framework.scheduling.pushpullmodel.ThreadListener
    public void onAfterTermination(AbstractStage abstractStage) {
        switch (STAGE_FACADE.getTerminationStrategy(abstractStage)) {
            case BY_SELF_DECISION:
                this.numRunningFiniteProducers.countDown();
                return;
            case BY_SIGNAL:
                this.numRunningConsumers.countDown();
                return;
            default:
                return;
        }
    }
}
