package com.navercorp.pinpoint.channel.service.server;

import com.google.common.base.Suppliers;
import com.navercorp.pinpoint.channel.ChannelProviderRepository;
import com.navercorp.pinpoint.channel.PubChannel;
import com.navercorp.pinpoint.channel.SubChannel;
import com.navercorp.pinpoint.channel.SubConsumer;
import com.navercorp.pinpoint.common.util.BytesUtils;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.lang.NonNull;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.class */
public class ChannelServiceServerImpl<D, S> implements ChannelServiceServer {
    private static final Logger logger = LogManager.getLogger(ChannelServiceServerImpl.class);
    private final ChannelProviderRepository channelProviderRepository;
    private final ChannelServiceServerProtocol<D, S> protocol;
    private final ChannelServiceMonoBackend<D, S> monoBackend;
    private final ChannelServiceFluxBackend<D, S> fluxBackend;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl$FluxDemandHandler.class */
    public class FluxDemandHandler implements SubConsumer {
        private final ChannelServiceFluxBackend<D, S> backend;

        public FluxDemandHandler(ChannelServiceFluxBackend<D, S> channelServiceFluxBackend) {
            this.backend = channelServiceFluxBackend;
        }

        @Override // com.navercorp.pinpoint.channel.SubConsumer
        public boolean consume(byte[] bArr) {
            try {
                return responseToDemand(ChannelServiceServerImpl.this.getProtocol().deserializeDemand(bArr));
            } catch (Exception e) {
                throw new RuntimeException("Failed to supply for demand: " + BytesUtils.toString(bArr), e);
            }
        }

        private boolean responseToDemand(D d) {
            Flux<S> demand = this.backend.demand(d);
            if (demand != null) {
                demand.doOnError(th -> {
                    ChannelServiceServerImpl.logger.debug("Ignored long pubsub demand: {}", d);
                }).onErrorComplete(IgnoreDemandException.class).subscribe(new PubChannelProxy(d));
                return true;
            }
            ChannelServiceServerImpl.logger.debug("Ignored long pubsub demand: {}", d);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl$MonoDemandHandler.class */
    public class MonoDemandHandler implements SubConsumer {
        private final ChannelServiceMonoBackend<D, S> backend;

        public MonoDemandHandler(ChannelServiceMonoBackend<D, S> channelServiceMonoBackend) {
            this.backend = channelServiceMonoBackend;
        }

        @Override // com.navercorp.pinpoint.channel.SubConsumer
        public boolean consume(byte[] bArr) {
            try {
                return responseToDemand(ChannelServiceServerImpl.this.getProtocol().deserializeDemand(bArr));
            } catch (Exception e) {
                throw new RuntimeException("Failed to supply for demand: " + BytesUtils.toString(bArr), e);
            }
        }

        private boolean responseToDemand(D d) {
            Mono<S> demand = this.backend.demand(d);
            if (demand != null) {
                demand.doOnError(th -> {
                    ChannelServiceServerImpl.logger.debug("Ignored short pubsub demand: {}", d);
                }).onErrorComplete(IgnoreDemandException.class).subscribe(new PubChannelProxy(d));
                return true;
            }
            ChannelServiceServerImpl.logger.debug("Ignored short pubsub demand: {}", d);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl$PubChannelProxy.class */
    public class PubChannelProxy extends BaseSubscriber<S> {
        private final D demand;
        private final Supplier<PubChannel> channelSupplier = Suppliers.memoize(this::buildPubChannel);

        PubChannelProxy(D d) {
            this.demand = (D) Objects.requireNonNull(d, "demand");
        }

        public void hookOnNext(@NonNull S s) {
            try {
                this.channelSupplier.get().publish(ChannelServiceServerImpl.this.getProtocol().serializeSupply(s));
            } catch (Exception e) {
                ChannelServiceServerImpl.logger.warn("Failed to send", e);
            }
        }

        private PubChannel buildPubChannel() {
            ChannelServiceServerImpl.logger.info("Responding pubsub demand ({})", this.demand);
            return ChannelServiceServerImpl.this.getSupplyPubChannel(this.demand);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelServiceServerImpl(ChannelProviderRepository channelProviderRepository, ChannelServiceServerProtocol<D, S> channelServiceServerProtocol, ChannelServiceMonoBackend<D, S> channelServiceMonoBackend) {
        this.channelProviderRepository = (ChannelProviderRepository) Objects.requireNonNull(channelProviderRepository, "channelProviderRepository");
        this.protocol = (ChannelServiceServerProtocol) Objects.requireNonNull(channelServiceServerProtocol, "protocol");
        this.monoBackend = (ChannelServiceMonoBackend) Objects.requireNonNull(channelServiceMonoBackend, "monoBackend");
        this.fluxBackend = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelServiceServerImpl(ChannelProviderRepository channelProviderRepository, ChannelServiceServerProtocol<D, S> channelServiceServerProtocol, ChannelServiceFluxBackend<D, S> channelServiceFluxBackend) {
        this.channelProviderRepository = (ChannelProviderRepository) Objects.requireNonNull(channelProviderRepository, "channelProviderRepository");
        this.protocol = (ChannelServiceServerProtocol) Objects.requireNonNull(channelServiceServerProtocol, "protocol");
        this.fluxBackend = (ChannelServiceFluxBackend) Objects.requireNonNull(channelServiceFluxBackend, "fluxBackend");
        this.monoBackend = null;
    }

    private SubChannel getDemandSubChannel() {
        return this.channelProviderRepository.getSubChannel(this.protocol.getDemandSubChannelURI());
    }

    private PubChannel getSupplyPubChannel(D d) {
        return this.channelProviderRepository.getPubChannel(this.protocol.getSupplyChannelURI(d));
    }

    private ChannelServiceServerProtocol<D, S> getProtocol() {
        return this.protocol;
    }

    @Override // com.navercorp.pinpoint.channel.service.server.ChannelServiceServer
    public void listen() {
        getDemandSubChannel().subscribe(getSubConsumer());
    }

    private SubConsumer getSubConsumer() {
        return this.monoBackend != null ? new MonoDemandHandler(this.monoBackend) : new FluxDemandHandler(this.fluxBackend);
    }
}
