package com.navercorp.pinpoint.channel.service.client;

import com.navercorp.pinpoint.channel.ChannelProviderRepository;
import com.navercorp.pinpoint.channel.PubChannel;
import com.navercorp.pinpoint.channel.SubChannel;
import com.navercorp.pinpoint.channel.SubConsumer;
import com.navercorp.pinpoint.channel.Subscription;
import com.navercorp.pinpoint.channel.reactor.DeferredDisposable;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.Disposable;

/* loaded from: input_file:com/navercorp/pinpoint/channel/service/client/AbstractChannelServiceClient.class */
public class AbstractChannelServiceClient<D, S> implements ChannelServiceClient {
    private final ChannelProviderRepository channelProviderRepository;
    private final ChannelServiceClientProtocol<D, S> protocol;

    /* loaded from: input_file:com/navercorp/pinpoint/channel/service/client/AbstractChannelServiceClient$SupplyProxyConsumer.class */
    private static class SupplyProxyConsumer<S> implements SubConsumer {
        private static final Logger logger = LogManager.getLogger(SupplyProxyConsumer.class);
        private final Consumer<S> valueEmitter;
        private final Consumer<Exception> errorEmitter;
        private final Runnable completeEmitter;
        private final Disposable disposable;
        private final ChannelServiceClientProtocol<?, S> protocol;
        private final AtomicBoolean isComplete = new AtomicBoolean(false);

        SupplyProxyConsumer(Consumer<S> consumer, Consumer<Exception> consumer2, Runnable runnable, Disposable disposable, ChannelServiceClientProtocol<?, S> channelServiceClientProtocol) {
            this.valueEmitter = consumer;
            this.errorEmitter = consumer2;
            this.completeEmitter = runnable;
            this.disposable = disposable;
            this.protocol = channelServiceClientProtocol;
        }

        @Override // com.navercorp.pinpoint.channel.SubConsumer
        public boolean consume(byte[] bArr) {
            S deserializeRawSupply = deserializeRawSupply(bArr);
            ChannelState channelState = this.protocol.getChannelState(deserializeRawSupply);
            if (channelState == ChannelState.ALIVE) {
                next(deserializeRawSupply);
                return true;
            }
            if (channelState != ChannelState.SENT_LAST_MESSAGE) {
                complete();
                return true;
            }
            next(deserializeRawSupply);
            complete();
            return true;
        }

        private S deserializeRawSupply(byte[] bArr) {
            try {
                return this.protocol.deserializeSupply(bArr);
            } catch (Exception e) {
                logger.error("Failed to deserialize raw supply", e);
                error(e);
                throw new IllegalArgumentException("Failed to deserialize raw supply", e);
            }
        }

        private void next(S s) {
            if (this.isComplete.get()) {
                return;
            }
            this.valueEmitter.accept(s);
        }

        private void complete() {
            if (this.isComplete.getAndSet(true)) {
                return;
            }
            this.completeEmitter.run();
            this.disposable.dispose();
        }

        private void error(Exception exc) {
            if (this.isComplete.getAndSet(true)) {
                return;
            }
            this.errorEmitter.accept(exc);
            this.disposable.dispose();
        }
    }

    public AbstractChannelServiceClient(ChannelProviderRepository channelProviderRepository, ChannelServiceClientProtocol<D, S> channelServiceClientProtocol) {
        this.channelProviderRepository = (ChannelProviderRepository) Objects.requireNonNull(channelProviderRepository, "channelProviderRepository");
        this.protocol = (ChannelServiceClientProtocol) Objects.requireNonNull(channelServiceClientProtocol, "protocol");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PubChannel getDemandPubChannel(D d) {
        return this.channelProviderRepository.getPubChannel(this.protocol.getDemandPubChannelURI(d));
    }

    protected SubChannel getSupplySubChannel(D d) {
        return this.channelProviderRepository.getSubChannel(this.protocol.getSupplyChannelURI(d));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelServiceClientProtocol<D, S> getProtocol() {
        return this.protocol;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Subscription subscribe(D d, Consumer<S> consumer, Consumer<Exception> consumer2, Runnable runnable) {
        DeferredDisposable deferredDisposable = new DeferredDisposable();
        Subscription subscribe = getSupplySubChannel(d).subscribe(new SupplyProxyConsumer(consumer, consumer2, runnable, deferredDisposable, getProtocol()));
        Objects.requireNonNull(subscribe);
        deferredDisposable.setDisposable(subscribe::unsubscribe);
        return subscribe;
    }
}
