package io.scalecube.services.benchmarks.service;

import io.scalecube.benchmarks.BenchmarkSettings;
import io.scalecube.benchmarks.metrics.BenchmarkMeter;
import io.scalecube.services.api.ServiceMessage;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:io/scalecube/services/benchmarks/service/InfiniteStreamBenchmark.class */
public class InfiniteStreamBenchmark {
    private static final String QUALIFIER = "/benchmarks/infiniteStream";
    private static final String RATE_LIMIT = "rateLimit";
    private static final Logger LOGGER = LoggerFactory.getLogger(InfiniteStreamBenchmark.class);
    private static final int DEFAULT_RATE_LIMIT = Queues.SMALL_BUFFER_SIZE;

    public static void main(String[] strArr) {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        BenchmarkSettings build = BenchmarkSettings.from(strArr).injectors(availableProcessors).messageRate(1).rampUpDuration(Duration.ofSeconds(availableProcessors)).build();
        new BenchmarkServiceState(build, new BenchmarkServiceImpl()).runWithRampUp((l, benchmarkServiceState) -> {
            return Mono.just(benchmarkServiceState.service(BenchmarkService.class));
        }, benchmarkServiceState2 -> {
            LatencyHelper latencyHelper = new LatencyHelper(benchmarkServiceState2);
            BenchmarkMeter meter = benchmarkServiceState2.meter("meter.service-to-client");
            Integer rateLimit = rateLimit(build);
            ServiceMessage build2 = ServiceMessage.builder().qualifier(QUALIFIER).build();
            return benchmarkService -> {
                return (l2, benchmarkTask) -> {
                    return benchmarkService.infiniteStream(build2).limitRate(rateLimit.intValue()).doOnNext(serviceMessage -> {
                        meter.mark();
                        latencyHelper.calculate(serviceMessage);
                    }).doOnError(th -> {
                        LOGGER.warn("Exception occured: " + th);
                    });
                };
            };
        }, (benchmarkServiceState3, benchmarkService) -> {
            return Mono.empty();
        });
    }

    private static Integer rateLimit(BenchmarkSettings benchmarkSettings) {
        return (Integer) Optional.ofNullable(benchmarkSettings.find(RATE_LIMIT, (String) null)).map(Integer::parseInt).orElse(Integer.valueOf(DEFAULT_RATE_LIMIT));
    }
}
