package io.servicetalk.opentracing.zipkin.publisher.reporter;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.buffer.api.CharSequences;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.BufferStrategies;
import io.servicetalk.concurrent.api.BufferStrategy;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.FutureUtils;
import io.servicetalk.http.api.HttpClient;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.CheckResult;
import zipkin2.Component;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.Reporter;

/* loaded from: input_file:io/servicetalk/opentracing/zipkin/publisher/reporter/HttpReporter.class */
public final class HttpReporter extends Component implements Reporter<Span>, AsyncCloseable {
    static final String V1_PATH = "/api/v1/spans";
    static final String V2_PATH = "/api/v2/spans";
    private final PublisherSource.Processor<Span, Span> buffer;
    private final CompositeCloseable closeable;
    private volatile boolean closeInitiated;
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpReporter.class);
    static final CharSequence THRIFT_CONTENT_TYPE = CharSequences.newAsciiString("application/x-thrift");
    static final CharSequence PROTO_CONTENT_TYPE = CharSequences.newAsciiString("application/x-protobuf");

    /* loaded from: input_file:io/servicetalk/opentracing/zipkin/publisher/reporter/HttpReporter$Builder.class */
    public static final class Builder {
        private final SingleAddressHttpClientBuilder<?, ?> clientBuilder;
        private Codec codec = Codec.JSON_V2;
        private boolean batchingEnabled = true;
        private int batchSizeHint = 16;
        private int maxConcurrentReports = 32;
        private Duration maxBatchDuration = Duration.ofSeconds(30);

        public Builder(SingleAddressHttpClientBuilder<?, ?> singleAddressHttpClientBuilder) {
            this.clientBuilder = singleAddressHttpClientBuilder;
        }

        public Builder codec(Codec codec) {
            this.codec = (Codec) Objects.requireNonNull(codec);
            return this;
        }

        public Builder maxConcurrentReports(int i) {
            if (i <= 0) {
                throw new IllegalArgumentException("maxConcurrentReports: " + i + " (expected > 0)");
            }
            this.maxConcurrentReports = i;
            return this;
        }

        public Builder batchSpans(int i, Duration duration) {
            if (i <= 0) {
                throw new IllegalArgumentException("batchSizeHint: " + i + " (expected > 0)");
            }
            this.batchingEnabled = true;
            this.batchSizeHint = i;
            this.maxBatchDuration = (Duration) Objects.requireNonNull(duration);
            return this;
        }

        public Builder spansBatchingEnabled(boolean z) {
            this.batchingEnabled = z;
            return this;
        }

        public HttpReporter build() {
            return new HttpReporter(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/opentracing/zipkin/publisher/reporter/HttpReporter$ListAccumulator.class */
    public static final class ListAccumulator implements BufferStrategy.Accumulator<Span, List<Span>> {
        private final List<Span> accumulate;

        ListAccumulator(int i) {
            this.accumulate = new ArrayList(i);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void accumulate(@Nonnull Span span) {
            this.accumulate.add(Objects.requireNonNull(span));
        }

        /* renamed from: finish, reason: merged with bridge method [inline-methods] */
        public List<Span> m4finish() {
            return this.accumulate;
        }
    }

    private HttpReporter(Builder builder) {
        this.closeable = AsyncCloseables.newCompositeCloseable();
        try {
            this.buffer = initReporter(builder, (HttpClient) this.closeable.append(builder.clientBuilder.build()));
        } catch (Throwable th) {
            try {
                this.closeable.close();
            } catch (Exception e) {
                LOGGER.error("Failed to close the client.", e);
            }
            throw th;
        }
    }

    public CheckResult check() {
        return this.closeInitiated ? CheckResult.failed(new IllegalStateException("Reporter is closed.")) : CheckResult.OK;
    }

    public void report(Span span) {
        if (this.closeInitiated) {
            throw new IllegalStateException("Span: " + span + " reported after reporter " + this + " is closed.");
        }
        this.buffer.onNext(span);
    }

    public void close() {
        FutureUtils.awaitTermination(this.closeable.closeAsync().toFuture());
    }

    public Completable closeAsync() {
        return this.closeable.closeAsync();
    }

    public Completable closeAsyncGracefully() {
        return this.closeable.closeAsyncGracefully();
    }

    private PublisherSource.Processor<Span, Span> initReporter(Builder builder, HttpClient httpClient) {
        PublisherSource newPublisherProcessorDropHeadOnOverflow;
        Publisher map;
        SpanBytesEncoder spanBytesEncoder = builder.codec.spanBytesEncoder();
        BufferAllocator bufferAllocator = httpClient.executionContext().bufferAllocator();
        if (builder.batchingEnabled) {
            newPublisherProcessorDropHeadOnOverflow = Processors.newPublisherProcessorDropHeadOnOverflow(builder.batchSizeHint * builder.maxConcurrentReports);
            map = SourceAdapters.fromSource(newPublisherProcessorDropHeadOnOverflow).buffer(BufferStrategies.forCountOrTime(builder.batchSizeHint, builder.maxBatchDuration, () -> {
                return new ListAccumulator(builder.batchSizeHint);
            }, httpClient.executionContext().executor())).filter(list -> {
                return !list.isEmpty();
            }).map(list2 -> {
                return bufferAllocator.wrap(spanBytesEncoder.encodeList(list2));
            });
        } else {
            newPublisherProcessorDropHeadOnOverflow = Processors.newPublisherProcessorDropHeadOnOverflow(builder.maxConcurrentReports);
            map = SourceAdapters.fromSource(newPublisherProcessorDropHeadOnOverflow).map(span -> {
                return bufferAllocator.wrap(spanBytesEncoder.encodeList(Collections.singletonList(span)));
            });
        }
        CompletableSource.Processor newCompletableProcessor = Processors.newCompletableProcessor();
        SourceAdapters.toSource(map.flatMapCompletable(encodedSpansReporter(httpClient, builder.codec), builder.maxConcurrentReports)).subscribe(newCompletableProcessor);
        PublisherSource publisherSource = newPublisherProcessorDropHeadOnOverflow;
        this.closeable.prepend(AsyncCloseables.toAsyncCloseable(z -> {
            this.closeInitiated = true;
            try {
                publisherSource.onComplete();
            } catch (Throwable th) {
                LOGGER.error("Failed to dispose request buffer. Ignoring.", th);
            }
            return z ? SourceAdapters.fromSource(newCompletableProcessor) : Completable.completed();
        }));
        return newPublisherProcessorDropHeadOnOverflow;
    }

    private static Function<Buffer, Completable> encodedSpansReporter(HttpClient httpClient, Codec codec) {
        String str;
        CharSequence charSequence;
        switch (codec) {
            case JSON_V1:
                str = V1_PATH;
                charSequence = HttpHeaderValues.APPLICATION_JSON;
                break;
            case JSON_V2:
                str = V2_PATH;
                charSequence = HttpHeaderValues.APPLICATION_JSON;
                break;
            case THRIFT:
                str = V2_PATH;
                charSequence = THRIFT_CONTENT_TYPE;
                break;
            case PROTO3:
                str = V2_PATH;
                charSequence = PROTO_CONTENT_TYPE;
                break;
            default:
                throw new IllegalArgumentException("Unknown codec: " + codec);
        }
        String str2 = str;
        CharSequence charSequence2 = charSequence;
        return buffer -> {
            return httpClient.request(httpClient.post(str2).setHeader(HttpHeaderNames.CONTENT_TYPE, charSequence2).payloadBody(buffer)).beforeOnSuccess(httpResponse -> {
                if (httpResponse.status().statusClass() != HttpResponseStatus.StatusClass.SUCCESSFUL_2XX) {
                    LOGGER.info("Unexpected response from the collector. Response headers: {}", httpResponse.toString((charSequence3, charSequence4) -> {
                        return charSequence4;
                    }));
                }
            }).ignoreElement().onErrorComplete(th -> {
                LOGGER.error("Failed to send a span, ignoring.", th);
                return true;
            });
        };
    }
}
