package org.apache.pulsar.proxy.server;

import io.prometheus.client.Counter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/proxy/server/LookupProxyHandler.class */
public class LookupProxyHandler {
    private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
    private final ProxyConnection proxyConnection;
    private final BrokerDiscoveryProvider discoveryProvider;
    private final boolean connectWithTLS;
    private final SocketAddress clientAddress;
    private final String brokerServiceURL;
    private final Semaphore lookupRequestSemaphore;
    private static final Counter LOOKUP_REQUESTS = Counter.build("pulsar_proxy_lookup_requests", "Counter of topic lookup requests").create().register();
    private static final Counter PARTITIONS_METADATA_REQUESTS = Counter.build("pulsar_proxy_partitions_metadata_requests", "Counter of partitions metadata requests").create().register();
    private static final Counter GET_TOPICS_OF_NAMESPACE_REQUESTS = Counter.build("pulsar_proxy_get_topics_of_namespace_requests", "Counter of getTopicsOfNamespace requests").create().register();
    private static final Counter GET_SCHEMA_REQUESTS = Counter.build("pulsar_proxy_get_schema_requests", "Counter of schema requests").create().register();
    static final Counter REJECTED_LOOKUP_REQUESTS = Counter.build("pulsar_proxy_rejected_lookup_requests", "Counter of topic lookup requests rejected due to throttling").create().register();
    static final Counter REJECTED_PARTITIONS_METADATA_REQUESTS = Counter.build("pulsar_proxy_rejected_partitions_metadata_requests", "Counter of partitions metadata requests rejected due to throttling").create().register();
    static final Counter REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS = Counter.build("pulsar_proxy_rejected_get_topics_of_namespace_requests", "Counter of getTopicsOfNamespace requests rejected due to throttling").create().register();
    private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);

    public LookupProxyHandler(ProxyService proxyService, ProxyConnection proxyConnection) {
        this.discoveryProvider = proxyService.getDiscoveryProvider();
        this.lookupRequestSemaphore = proxyService.getLookupRequestSemaphore();
        this.proxyConnection = proxyConnection;
        this.clientAddress = proxyConnection.clientAddress();
        this.connectWithTLS = proxyService.getConfiguration().isTlsEnabledWithBroker();
        this.brokerServiceURL = this.connectWithTLS ? proxyService.getConfiguration().getBrokerServiceURLTLS() : proxyService.getConfiguration().getBrokerServiceURL();
    }

    public void handleLookup(CommandLookupTopic commandLookupTopic) {
        if (log.isDebugEnabled()) {
            log.debug("Received Lookup from {}", this.clientAddress);
        }
        long requestId = commandLookupTopic.getRequestId();
        if (!this.lookupRequestSemaphore.tryAcquire()) {
            REJECTED_LOOKUP_REQUESTS.inc();
            if (log.isDebugEnabled()) {
                log.debug("Lookup Request ID {} from {} rejected - {}.", new Object[]{Long.valueOf(requestId), this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.TooManyRequests, "Too many concurrent lookup and partitionsMetadata requests", requestId));
            return;
        }
        try {
            LOOKUP_REQUESTS.inc();
            String brokerServiceUrl = getBrokerServiceUrl(requestId);
            if (brokerServiceUrl != null) {
                performLookup(requestId, commandLookupTopic.getTopic(), brokerServiceUrl, false, 10);
            }
        } finally {
            this.lookupRequestSemaphore.release();
        }
    }

    private void performLookup(long j, String str, String str2, boolean z, int i) {
        if (i == 0) {
            this.proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, "Reached max number of redirections", j));
            return;
        }
        try {
            URI uri = new URI(str2);
            InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
            if (log.isDebugEnabled()) {
                log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", new Object[]{createUnresolved, str, Long.valueOf(j)});
            }
            this.proxyConnection.getConnectionPool().getConnection(createUnresolved).thenAccept(clientCnx -> {
                long newRequestId = this.proxyConnection.newRequestId();
                clientCnx.newLookup(Commands.newLookup(str, z, newRequestId), newRequestId).whenComplete((lookupDataResult, th) -> {
                    if (th != null) {
                        log.warn("[{}] Failed to lookup topic {}: {}", new Object[]{this.clientAddress, str, th.getMessage()});
                        this.proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(th), th.getMessage(), j));
                    } else {
                        String str3 = this.connectWithTLS ? lookupDataResult.brokerUrlTls : lookupDataResult.brokerUrl;
                        if (lookupDataResult.redirect) {
                            performLookup(j, str, str3, lookupDataResult.authoritative, i - 1);
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug("Successfully perform lookup '{}' for topic '{}' with clientReq Id '{}' and lookup-broker {}", new Object[]{createUnresolved, str, Long.valueOf(j), str3});
                            }
                            this.proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(str3, str3, true, CommandLookupTopicResponse.LookupType.Connect, j, true));
                        }
                    }
                    this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
                });
            }).exceptionally(th -> {
                this.proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(th), th.getMessage(), j));
                return null;
            });
        } catch (URISyntaxException e) {
            this.proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.MetadataError, e.getMessage(), j));
        }
    }

    public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata commandPartitionedTopicMetadata) {
        PARTITIONS_METADATA_REQUESTS.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup", this.clientAddress);
        }
        long requestId = commandPartitionedTopicMetadata.getRequestId();
        if (!this.lookupRequestSemaphore.tryAcquire()) {
            REJECTED_PARTITIONS_METADATA_REQUESTS.inc();
            if (log.isDebugEnabled()) {
                log.debug("PartitionMetaData Request ID {} from {} rejected - {}.", new Object[]{Long.valueOf(requestId), this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, "Too many concurrent lookup and partitionsMetadata requests", requestId));
            return;
        }
        try {
            handlePartitionMetadataResponse(commandPartitionedTopicMetadata, requestId);
            this.lookupRequestSemaphore.release();
        } catch (Throwable th) {
            this.lookupRequestSemaphore.release();
            throw th;
        }
    }

    private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata commandPartitionedTopicMetadata, long j) {
        TopicName topicName = TopicName.get(commandPartitionedTopicMetadata.getTopic());
        String brokerServiceUrl = getBrokerServiceUrl(j);
        if (brokerServiceUrl == null) {
            log.warn("No available broker for {} to lookup partition metadata", topicName);
            return;
        }
        InetSocketAddress addr = getAddr(brokerServiceUrl, j);
        if (addr == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", new Object[]{addr, topicName.getPartitionedTopicName(), Long.valueOf(j)});
        }
        this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long newRequestId = this.proxyConnection.newRequestId();
            clientCnx.newLookup(Commands.newPartitionMetadataRequest(topicName.toString(), newRequestId), newRequestId).whenComplete((lookupDataResult, th) -> {
                if (th != null) {
                    log.warn("[{}] failed to get Partitioned metadata : {}", new Object[]{topicName.toString(), th.getMessage(), th});
                    this.proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(th), th.getMessage(), j));
                } else {
                    this.proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(lookupDataResult.partitions, j));
                }
                this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
            });
        }).exceptionally(th -> {
            this.proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(th), th.getMessage(), j));
            return null;
        });
    }

    public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        GET_TOPICS_OF_NAMESPACE_REQUESTS.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received GetTopicsOfNamespace", this.clientAddress);
        }
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        if (!this.lookupRequestSemaphore.tryAcquire()) {
            REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS.inc();
            if (log.isDebugEnabled()) {
                log.debug("GetTopicsOfNamespace Request ID {} from {} rejected - {}.", new Object[]{Long.valueOf(requestId), this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.proxyConnection.ctx().writeAndFlush(Commands.newError(requestId, ServerError.ServiceNotReady, "Too many concurrent lookup and partitionsMetadata requests"));
            return;
        }
        try {
            handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
            this.lookupRequestSemaphore.release();
        } catch (Throwable th) {
            this.lookupRequestSemaphore.release();
            throw th;
        }
    }

    private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace, long j) {
        String brokerServiceUrl = getBrokerServiceUrl(j);
        if (StringUtils.isNotBlank(brokerServiceUrl)) {
            performGetTopicsOfNamespace(j, commandGetTopicsOfNamespace.getNamespace(), brokerServiceUrl, 10, commandGetTopicsOfNamespace.getMode());
        }
    }

    private void performGetTopicsOfNamespace(long j, String str, String str2, int i, CommandGetTopicsOfNamespace.Mode mode) {
        if (i == 0) {
            this.proxyConnection.ctx().writeAndFlush(Commands.newError(j, ServerError.ServiceNotReady, "Reached max number of redirections"));
            return;
        }
        InetSocketAddress addr = getAddr(str2, j);
        if (addr == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for getting TopicsOfNamespace '{}' with clientReq Id '{}'", new Object[]{addr, str, Long.valueOf(j)});
        }
        this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long newRequestId = this.proxyConnection.newRequestId();
            clientCnx.newGetTopicsOfNamespace(Commands.newGetTopicsOfNamespaceRequest(str, newRequestId, mode), newRequestId).whenComplete((list, th) -> {
                if (th == null) {
                    this.proxyConnection.ctx().writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(list, j));
                } else {
                    log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", new Object[]{this.clientAddress, str, th.getMessage()});
                    this.proxyConnection.ctx().writeAndFlush(Commands.newError(j, getServerError(th), th.getMessage()));
                }
            });
            this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
        }).exceptionally(th -> {
            this.proxyConnection.ctx().writeAndFlush(Commands.newError(j, getServerError(th), th.getMessage()));
            return null;
        });
    }

    public void handleGetSchema(CommandGetSchema commandGetSchema) {
        InetSocketAddress addr;
        GET_SCHEMA_REQUESTS.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received GetSchema {}", this.clientAddress, commandGetSchema);
        }
        long requestId = commandGetSchema.getRequestId();
        String brokerServiceUrl = getBrokerServiceUrl(requestId);
        String topic = commandGetSchema.getTopic();
        Optional map = commandGetSchema.hasSchemaVersion() ? Optional.of(commandGetSchema.getSchemaVersion()).map(BytesSchemaVersion::of) : Optional.empty();
        if (StringUtils.isNotBlank(brokerServiceUrl) && (addr = getAddr(brokerServiceUrl, requestId)) != null) {
            if (log.isDebugEnabled()) {
                log.debug("Getting connections to '{}' for getting schema of topic '{}' with clientReq Id '{}'", new Object[]{addr, topic, Long.valueOf(requestId)});
            }
            Optional optional = map;
            this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
                long newRequestId = this.proxyConnection.newRequestId();
                clientCnx.sendGetRawSchema(Commands.newGetSchema(newRequestId, topic, optional), newRequestId).whenComplete((commandGetSchemaResponse, th) -> {
                    if (th != null) {
                        log.warn("[{}] Failed to get schema {}: {}", new Object[]{this.clientAddress, topic, th});
                        this.proxyConnection.ctx().writeAndFlush(Commands.newError(requestId, getServerError(th), th.getMessage()));
                    } else {
                        this.proxyConnection.ctx().writeAndFlush(Commands.newGetSchemaResponse(requestId, commandGetSchemaResponse));
                    }
                    this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
                });
            }).exceptionally(th -> {
                this.proxyConnection.ctx().writeAndFlush(Commands.newError(requestId, getServerError(th), th.getMessage()));
                return null;
            });
        }
    }

    private String getBrokerServiceUrl(long j) {
        if (StringUtils.isNotBlank(this.brokerServiceURL)) {
            return this.brokerServiceURL;
        }
        try {
            LoadManagerReport nextBroker = this.discoveryProvider.nextBroker();
            return this.connectWithTLS ? nextBroker.getPulsarServiceUrlTls() : nextBroker.getPulsarServiceUrl();
        } catch (Exception e) {
            log.warn("[{}] Failed to get next active broker {}", new Object[]{this.clientAddress, e.getMessage(), e});
            this.proxyConnection.ctx().writeAndFlush(Commands.newError(j, ServerError.ServiceNotReady, e.getMessage()));
            return null;
        }
    }

    private InetSocketAddress getAddr(String str, long j) {
        try {
            URI uri = new URI(str);
            return InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
        } catch (URISyntaxException e) {
            this.proxyConnection.ctx().writeAndFlush(Commands.newError(j, ServerError.MetadataError, e.getMessage()));
            return null;
        }
    }

    private ServerError getServerError(Throwable th) {
        return th instanceof PulsarClientException.AuthorizationException ? ServerError.AuthorizationError : th instanceof PulsarClientException.AuthenticationException ? ServerError.AuthenticationError : ServerError.ServiceNotReady;
    }
}
