package io.scalecube.benchmarks;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.MetricRegistry;
import io.scalecube.benchmarks.BenchmarkState;
import io.scalecube.benchmarks.metrics.BenchmarkMeter;
import io.scalecube.benchmarks.metrics.BenchmarkTimer;
import io.scalecube.benchmarks.metrics.CodahaleBenchmarkMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/scalecube/benchmarks/BenchmarkState.class */
public class BenchmarkState<S extends BenchmarkState<S>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkState.class);
    protected final BenchmarkSettings settings;
    private Scheduler scheduler;
    private List<Scheduler> schedulers;
    private MetricRegistry registry;
    private BenchmarkMetrics metrics;
    private ConsoleReporter consoleReporter;
    private CsvReporter csvReporter;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean warmUpOccurred = new AtomicBoolean(false);
    private Disposable warmUpSubscriber;

    public BenchmarkState(BenchmarkSettings benchmarkSettings) {
        this.settings = benchmarkSettings;
    }

    protected void beforeAll() throws Exception {
    }

    protected void afterAll() throws Exception {
    }

    public final void start() {
        if (!this.started.compareAndSet(false, true)) {
            throw new IllegalStateException("BenchmarkState is already started");
        }
        LOGGER.info("Benchmarks settings: " + this.settings);
        this.registry = new MetricRegistry();
        MetricRegistry metricRegistry = this.registry;
        AtomicBoolean atomicBoolean = this.warmUpOccurred;
        atomicBoolean.getClass();
        this.metrics = new CodahaleBenchmarkMetrics(metricRegistry, atomicBoolean::get);
        if (this.settings.consoleReporterEnabled()) {
            this.consoleReporter = ConsoleReporter.forRegistry(this.registry).outputTo(System.out).convertDurationsTo(this.settings.durationUnit()).convertRatesTo(this.settings.rateUnit()).build();
        }
        this.csvReporter = CsvReporter.forRegistry(this.registry).convertDurationsTo(this.settings.durationUnit()).convertRatesTo(this.settings.rateUnit()).build(this.settings.csvReporterDirectory());
        this.scheduler = Schedulers.fromExecutor(Executors.newFixedThreadPool(this.settings.numberThreads()));
        this.schedulers = (List) IntStream.rangeClosed(1, this.settings.numberThreads()).mapToObj(i -> {
            return Schedulers.fromExecutorService(Executors.newSingleThreadScheduledExecutor());
        }).collect(Collectors.toList());
        try {
            beforeAll();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                if (this.started.get()) {
                    this.csvReporter.report();
                    if (this.consoleReporter != null) {
                        this.consoleReporter.report();
                    }
                }
            }));
            this.warmUpSubscriber = Mono.delay(this.settings.warmUpDuration()).doOnSuccess(l -> {
                this.warmUpOccurred.compareAndSet(false, true);
                if (this.settings.consoleReporterEnabled()) {
                    this.consoleReporter.start(this.settings.reporterInterval().toMillis(), TimeUnit.MILLISECONDS);
                }
                this.csvReporter.start(this.settings.reporterInterval().toMillis(), TimeUnit.MILLISECONDS);
            }).subscribe();
        } catch (Exception e) {
            throw new IllegalStateException("BenchmarkState beforeAll() failed: " + e, e);
        }
    }

    public final void shutdown() {
        if (!this.started.compareAndSet(true, false)) {
            throw new IllegalStateException("BenchmarkState is not started");
        }
        if (this.warmUpSubscriber != null) {
            this.warmUpSubscriber.dispose();
        }
        if (this.consoleReporter != null) {
            this.consoleReporter.report();
            this.consoleReporter.stop();
        }
        if (this.csvReporter != null) {
            this.csvReporter.report();
            this.csvReporter.stop();
        }
        if (this.scheduler != null) {
            this.scheduler.dispose();
        }
        if (this.schedulers != null) {
            this.schedulers.forEach((v0) -> {
                v0.dispose();
            });
        }
        try {
            afterAll();
        } catch (Exception e) {
            throw new IllegalStateException("BenchmarkState afterAll() failed: " + e, e);
        }
    }

    public Scheduler scheduler() {
        return this.scheduler;
    }

    public List<Scheduler> schedulers() {
        return this.schedulers;
    }

    public MetricRegistry registry() {
        return this.registry;
    }

    public BenchmarkTimer timer(String str) {
        return this.metrics.timer(this.settings.taskName() + "-" + str);
    }

    public BenchmarkMeter meter(String str) {
        return this.metrics.meter(this.settings.taskName() + "-" + str);
    }

    public final void runForSync(Function<S, Function<Long, Object>> function) {
        try {
            try {
                start();
                Function<Long, Object> apply = function.apply(this);
                CountDownLatch countDownLatch = new CountDownLatch(1);
                ParallelFlux map = Flux.fromStream(LongStream.range(0L, this.settings.numOfIterations()).boxed()).parallel().runOn(scheduler()).map(apply);
                countDownLatch.getClass();
                map.doOnTerminate(countDownLatch::countDown).subscribe();
                countDownLatch.await(this.settings.executionTaskDuration().toMillis(), TimeUnit.MILLISECONDS);
                shutdown();
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
        } catch (Throwable th) {
            shutdown();
            throw th;
        }
    }

    public final void runForAsync(Function<S, Function<Long, Publisher<?>>> function) {
        try {
            start();
            Function<Long, Publisher<?>> apply = function.apply(this);
            int numberThreads = this.settings.numberThreads();
            long numOfIterations = this.settings.numOfIterations() / numberThreads;
            Function function2 = num -> {
                return Mono.fromRunnable(() -> {
                    long intValue = num.intValue() * numOfIterations;
                    Flux fromStream = Flux.fromStream(LongStream.range(intValue, intValue + numOfIterations).boxed());
                    apply.getClass();
                    fromStream.flatMap((v1) -> {
                        return r1.apply(v1);
                    }, this.settings.concurrency(), Integer.MAX_VALUE).take(this.settings.executionTaskDuration()).blockLast();
                });
            };
            Flux.range(0, numberThreads).flatMap(num2 -> {
                return ((Mono) function2.apply(num2)).subscribeOn(scheduler());
            }, Integer.MAX_VALUE, Integer.MAX_VALUE).blockLast();
            shutdown();
        } catch (Throwable th) {
            shutdown();
            throw th;
        }
    }

    public final <T> void runWithRampUp(BiFunction<Long, S, Publisher<T>> biFunction, Function<S, Function<T, BiFunction<Long, BenchmarkTask, Publisher<?>>>> function, BiFunction<S, T, Mono<Void>> biFunction2) {
        try {
            start();
            Function<T, BiFunction<Long, BenchmarkTask, Publisher<?>>> apply = function.apply(this);
            Flux.interval(Duration.ZERO, this.settings.rampUpInterval()).take(this.settings.rampUpDuration()).flatMap(l -> {
                Scheduler selectScheduler = selectScheduler(l);
                return Flux.range(0, Math.max(1, this.settings.injectorsPerRampUpInterval())).flatMap(num -> {
                    Flux map = createSetUpFactory(biFunction, this, l).subscribeOn(selectScheduler).map(obj -> {
                        return new BenchmarkTaskImpl(this.settings, selectScheduler, (BiFunction) apply.apply(obj), () -> {
                            return (Mono) biFunction2.apply(this, obj);
                        });
                    });
                    selectScheduler.getClass();
                    return map.doOnNext((v1) -> {
                        r1.schedule(v1);
                    }).flatMap((v0) -> {
                        return v0.completionMono();
                    });
                });
            }, Integer.MAX_VALUE, Integer.MAX_VALUE).blockLast();
            shutdown();
        } catch (Throwable th) {
            shutdown();
            throw th;
        }
    }

    private Scheduler selectScheduler(Long l) {
        return schedulers().get((int) ((l.longValue() & Long.MAX_VALUE) % schedulers().size()));
    }

    private <T> Flux<T> createSetUpFactory(BiFunction<Long, S, Publisher<T>> biFunction, S s, Long l) {
        return Flux.create(fluxSink -> {
            Flux defer = Flux.defer(() -> {
                return (Publisher) biFunction.apply(l, s);
            });
            fluxSink.getClass();
            Consumer consumer = fluxSink::next;
            Consumer consumer2 = th -> {
                LOGGER.error("Exception occured on setUp at rampUpIteration: {}, cause: {}, task won't start", new Object[]{l, th, th});
                fluxSink.complete();
            };
            fluxSink.getClass();
            defer.subscribe(consumer, consumer2, fluxSink::complete);
        });
    }
}
