package com.navercorp.pinpoint.channel.service.client;

import com.navercorp.pinpoint.channel.ChannelProviderRepository;
import com.navercorp.pinpoint.channel.PubChannel;
import com.navercorp.pinpoint.channel.Subscription;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/navercorp/pinpoint/channel/service/client/FluxChannelServiceClientImpl.class */
class FluxChannelServiceClientImpl<D, S> extends AbstractChannelServiceClient<D, S> implements FluxChannelServiceClient<D, S> {
    private static final Disposable EMPTY_DISPOSABLE = () -> {
    };
    private final FluxChannelServiceClientProtocol<D, S> protocol;
    private final Scheduler demandScheduler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxChannelServiceClientImpl(ChannelProviderRepository channelProviderRepository, FluxChannelServiceClientProtocol<D, S> fluxChannelServiceClientProtocol, Scheduler scheduler) {
        super(channelProviderRepository, fluxChannelServiceClientProtocol);
        this.protocol = (FluxChannelServiceClientProtocol) Objects.requireNonNull(fluxChannelServiceClientProtocol, "protocol");
        this.demandScheduler = (Scheduler) Objects.requireNonNull(scheduler, "demandScheduler");
    }

    @Override // com.navercorp.pinpoint.channel.service.client.FluxChannelServiceClient
    public Flux<S> request(D d) {
        return Flux.create(fluxSink -> {
            request0(d, fluxSink);
        }).onErrorMap(th -> {
            return new RuntimeException("Failed to request", th);
        });
    }

    private void request0(D d, FluxSink<S> fluxSink) {
        byte[] serializeDemand = getProtocol().serializeDemand(d);
        Objects.requireNonNull(fluxSink);
        Consumer<S> consumer = fluxSink::next;
        Objects.requireNonNull(fluxSink);
        Consumer<Exception> consumer2 = (v1) -> {
            r3.error(v1);
        };
        Objects.requireNonNull(fluxSink);
        Subscription subscribe = subscribe(d, consumer, consumer2, fluxSink::complete);
        Disposable scheduleDemandPeriodically = scheduleDemandPeriodically(serializeDemand, getDemandPubChannel(d));
        fluxSink.onDispose(() -> {
            subscribe.unsubscribe();
            scheduleDemandPeriodically.dispose();
        });
    }

    private Disposable scheduleDemandPeriodically(byte[] bArr, PubChannel pubChannel) {
        pubChannel.publish(bArr);
        return this.protocol.getDemandInterval() != Duration.ZERO ? Flux.interval(this.protocol.getDemandInterval()).subscribeOn(this.demandScheduler).subscribe(l -> {
            pubChannel.publish(bArr);
        }) : EMPTY_DISPOSABLE;
    }
}
