package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.dns.DnsNameResolver;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/proxy/server/ProxyConnection.class */
public class ProxyConnection extends PulsarHandler {
    private ConnectionPool connectionPool;
    private final AtomicLong requestIdGenerator;
    private final ProxyService service;
    private final DnsNameResolver dnsNameResolver;
    AuthenticationDataSource authenticationData;
    private State state;
    private final Supplier<SslHandler> sslHandlerSupplier;
    private LookupProxyHandler lookupProxyHandler;
    private DirectProxyHandler directProxyHandler;
    private final BrokerProxyValidator brokerProxyValidator;
    String clientAuthRole;
    AuthData clientAuthData;
    String clientAuthMethod;
    private String authMethod;
    AuthenticationProvider authenticationProvider;
    AuthenticationState authState;
    private ClientConfigurationData clientConf;
    private boolean hasProxyToBrokerUrl;
    private int protocolVersionToAdvertise;
    private String proxyToBrokerUrl;
    private HAProxyMessage haProxyMessage;
    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
    private static final byte[] EMPTY_CREDENTIALS = new byte[0];

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/proxy/server/ProxyConnection$State.class */
    public enum State {
        Init,
        Connecting,
        ProxyLookupRequests,
        ProxyConnectionToBroker,
        Closing,
        Closed
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionPool getConnectionPool() {
        return this.connectionPool;
    }

    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> supplier, DnsNameResolver dnsNameResolver) {
        super(30, TimeUnit.SECONDS);
        this.requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0L, 4611686018427387903L));
        this.lookupProxyHandler = null;
        this.directProxyHandler = null;
        this.authMethod = "none";
        this.service = proxyService;
        this.dnsNameResolver = dnsNameResolver;
        this.state = State.Init;
        this.sslHandlerSupplier = supplier;
        this.brokerProxyValidator = this.service.getBrokerProxyValidator();
    }

    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelRegistered(channelHandlerContext);
        ProxyService.ACTIVE_CONNECTIONS.inc();
        if (ProxyService.ACTIVE_CONNECTIONS.get() > this.service.getConfiguration().getMaxConcurrentInboundConnections()) {
            this.state = State.Closing;
            channelHandlerContext.close();
            ProxyService.REJECTED_CONNECTIONS.inc();
        }
    }

    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelUnregistered(channelHandlerContext);
        ProxyService.ACTIVE_CONNECTIONS.dec();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        ProxyService.NEW_CONNECTIONS.inc();
        this.service.getClientCnxs().add(this);
        LOG.info("[{}] New connection opened", this.remoteAddress);
    }

    public synchronized void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        if (this.directProxyHandler != null && this.directProxyHandler.outboundChannel != null) {
            this.directProxyHandler.outboundChannel.close();
            this.directProxyHandler = null;
        }
        this.service.getClientCnxs().remove(this);
        LOG.info("[{}] Connection closed", this.remoteAddress);
        if (this.connectionPool != null) {
            try {
                this.connectionPool.close();
                this.connectionPool = null;
            } catch (Exception e) {
                LOG.error("Failed to close connection pool {}", e.getMessage(), e);
            }
        }
        this.state = State.Closed;
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.state = State.Closing;
        super.exceptionCaught(channelHandlerContext, th);
        Logger logger = LOG;
        Object[] objArr = new Object[4];
        objArr[0] = this.remoteAddress;
        objArr[1] = th.getClass().getSimpleName();
        objArr[2] = th.getMessage();
        objArr[3] = ClientCnx.isKnownException(th) ? null : th;
        logger.warn("[{}] Got exception {} : {} {}", objArr);
        channelHandlerContext.close();
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.directProxyHandler != null && this.directProxyHandler.outboundChannel != null) {
            this.directProxyHandler.outboundChannel.config().setAutoRead(channelHandlerContext.channel().isWritable());
        }
        super.channelWritabilityChanged(channelHandlerContext);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof HAProxyMessage) {
            this.haProxyMessage = (HAProxyMessage) obj;
            return;
        }
        switch (this.state) {
            case Init:
            case Connecting:
            case ProxyLookupRequests:
                super.channelRead(channelHandlerContext, obj);
                return;
            case ProxyConnectionToBroker:
                ProxyService.OPS_COUNTER.inc();
                if (obj instanceof ByteBuf) {
                    int readableBytes = ((ByteBuf) obj).readableBytes();
                    this.directProxyHandler.getInboundChannelRequestsRate().recordEvent(readableBytes);
                    ProxyService.BYTES_COUNTER.inc(readableBytes);
                }
                this.directProxyHandler.outboundChannel.writeAndFlush(obj).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                return;
            default:
                return;
        }
    }

    private synchronized void completeConnect(AuthData authData) throws PulsarClientException {
        Supplier supplier;
        if (this.service.getConfiguration().isAuthenticationEnabled()) {
            if (this.service.getConfiguration().isForwardAuthorizationCredentials()) {
                this.clientAuthData = authData;
                this.clientAuthMethod = this.authMethod;
            }
            supplier = () -> {
                return new ProxyClientCnx(this.clientConf, this.service.getWorkerGroup(), this.clientAuthRole, this.clientAuthData, this.clientAuthMethod, this.protocolVersionToAdvertise);
            };
        } else {
            supplier = () -> {
                return new ClientCnx(this.clientConf, this.service.getWorkerGroup(), this.protocolVersionToAdvertise);
            };
        }
        if (this.connectionPool == null) {
            this.connectionPool = new ConnectionPool(this.clientConf, this.service.getWorkerGroup(), supplier, Optional.of(this.dnsNameResolver));
        } else {
            LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", new Object[]{this.remoteAddress, this.state, this.clientAuthRole});
        }
        LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", new Object[]{this.remoteAddress, this.authMethod, this.clientAuthRole, Boolean.valueOf(this.hasProxyToBrokerUrl)});
        if (!this.hasProxyToBrokerUrl) {
            this.state = State.ProxyLookupRequests;
            this.lookupProxyHandler = new LookupProxyHandler(this.service, this);
            this.ctx.writeAndFlush(Commands.newConnected(this.protocolVersionToAdvertise)).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        } else {
            if (!this.service.getConfiguration().isCheckActiveBrokers() || isBrokerActive(this.proxyToBrokerUrl)) {
                this.brokerProxyValidator.resolveAndCheckTargetAddress(this.proxyToBrokerUrl).thenAcceptAsync(inetSocketAddress -> {
                    this.state = State.ProxyConnectionToBroker;
                    this.directProxyHandler = new DirectProxyHandler(this.service, this, this.proxyToBrokerUrl, inetSocketAddress, this.protocolVersionToAdvertise, this.sslHandlerSupplier);
                }, (Executor) this.ctx.executor()).exceptionally(th -> {
                    if ((th instanceof TargetAddressDeniedException) || (th.getCause() instanceof TargetAddressDeniedException)) {
                        LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, ((TargetAddressDeniedException) (th instanceof TargetAddressDeniedException ? th : th.getCause())).getMessage(), this.authMethod, this.clientAuthRole});
                    } else {
                        LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, this.authMethod, this.clientAuthRole, th});
                    }
                    ctx().writeAndFlush(Commands.newError(-1L, ServerError.ServiceNotReady, "Target broker cannot be validated.")).addListener(ChannelFutureListener.CLOSE);
                    return null;
                });
                return;
            }
            this.state = State.Closing;
            LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.", new Object[]{this.remoteAddress, this.proxyToBrokerUrl, this.authMethod, this.clientAuthRole});
            ctx().writeAndFlush(Commands.newError(-1L, ServerError.ServiceNotReady, "Target broker isn't available.")).addListener(ChannelFutureListener.CLOSE);
        }
    }

    private void doAuthentication(AuthData authData) throws Exception {
        AuthData authenticate = this.authState.authenticate(authData);
        if (this.authState.isComplete()) {
            this.clientAuthRole = this.authState.getAuthRole();
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Client successfully authenticated with {} role {}", new Object[]{this.remoteAddress, this.authMethod, this.clientAuthRole});
            }
            completeConnect(authData);
            return;
        }
        this.ctx.writeAndFlush(Commands.newAuthChallenge(this.authMethod, authenticate, this.protocolVersionToAdvertise)).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        if (LOG.isDebugEnabled()) {
            LOG.debug("[{}] Authentication in progress client by method {}.", this.remoteAddress, this.authMethod);
        }
        this.state = State.Connecting;
    }

    protected void handleConnect(CommandConnect commandConnect) {
        Preconditions.checkArgument(this.state == State.Init);
        setRemoteEndpointProtocolVersion(commandConnect.getProtocolVersion());
        this.hasProxyToBrokerUrl = commandConnect.hasProxyToBrokerUrl();
        this.protocolVersionToAdvertise = getProtocolVersionToAdvertise(commandConnect);
        this.proxyToBrokerUrl = commandConnect.hasProxyToBrokerUrl() ? commandConnect.getProxyToBrokerUrl() : "null";
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received CONNECT from {} proxyToBroker={}", this.remoteAddress, this.proxyToBrokerUrl);
            LOG.debug("[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}", new Object[]{this.remoteAddress, Integer.valueOf(this.protocolVersionToAdvertise), Integer.valueOf(getRemoteEndpointProtocolVersion()), Integer.valueOf(Commands.getCurrentProtocolVersion())});
        }
        if (getRemoteEndpointProtocolVersion() < ProtocolVersion.v10.getValue()) {
            LOG.warn("[{}] Client doesn't support connecting through proxy", this.remoteAddress);
            this.state = State.Closing;
            this.ctx.close();
            return;
        }
        try {
            this.clientConf = createClientConfiguration();
            if (!this.service.getConfiguration().isAuthenticationEnabled()) {
                completeConnect(null);
                return;
            }
            AuthData of = AuthData.of(commandConnect.hasAuthData() ? commandConnect.getAuthData() : EMPTY_CREDENTIALS);
            if (commandConnect.hasAuthMethodName()) {
                this.authMethod = commandConnect.getAuthMethodName();
            } else if (commandConnect.hasAuthMethod()) {
                this.authMethod = commandConnect.getAuthMethod().name().substring(10).toLowerCase();
            } else {
                this.authMethod = "none";
            }
            this.authenticationProvider = this.service.getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.clientAuthRole = (String) this.service.getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> {
                    return new AuthenticationException("No anonymous role, and no authentication provider configured");
                });
                completeConnect(of);
                return;
            }
            SslHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sSLSession = null;
            if (sslHandler != null) {
                sSLSession = sslHandler.engine().getSession();
            }
            this.authState = this.authenticationProvider.newAuthState(of, this.remoteAddress, sSLSession);
            this.authenticationData = this.authState.getAuthDataSource();
            doAuthentication(of);
        } catch (Exception e) {
            LOG.warn("[{}] Unable to authenticate: ", this.remoteAddress, e);
            this.ctx.writeAndFlush(Commands.newError(-1L, ServerError.AuthenticationError, "Failed to authenticate")).addListener(ChannelFutureListener.CLOSE);
        }
    }

    protected void handleAuthResponse(CommandAuthResponse commandAuthResponse) {
        Preconditions.checkArgument(this.state == State.Connecting);
        Preconditions.checkArgument(commandAuthResponse.hasResponse());
        Preconditions.checkArgument(commandAuthResponse.getResponse().hasAuthData() && commandAuthResponse.getResponse().hasAuthMethodName());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received AuthResponse from {}, auth method: {}", this.remoteAddress, commandAuthResponse.getResponse().getAuthMethodName());
        }
        try {
            doAuthentication(AuthData.of(commandAuthResponse.getResponse().getAuthData()));
        } catch (Exception e) {
            LOG.warn("[{}] {} ", new Object[]{this.remoteAddress, "Unable to handleAuthResponse", e});
            this.ctx.writeAndFlush(Commands.newError(-1L, ServerError.AuthenticationError, "Unable to handleAuthResponse")).addListener(ChannelFutureListener.CLOSE);
        }
    }

    protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata commandPartitionedTopicMetadata) {
        Preconditions.checkArgument(this.state == State.ProxyLookupRequests);
        this.lookupProxyHandler.handlePartitionMetadataResponse(commandPartitionedTopicMetadata);
    }

    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        Preconditions.checkArgument(this.state == State.ProxyLookupRequests);
        this.lookupProxyHandler.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace);
    }

    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
        Preconditions.checkArgument(this.state == State.ProxyLookupRequests);
        this.lookupProxyHandler.handleGetSchema(commandGetSchema);
    }

    protected void handleLookup(CommandLookupTopic commandLookupTopic) {
        Preconditions.checkArgument(this.state == State.ProxyLookupRequests);
        this.lookupProxyHandler.handleLookup(commandLookupTopic);
    }

    ClientConfigurationData createClientConfiguration() {
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setServiceUrl(this.service.getServiceUrl());
        ProxyConfiguration configuration = this.service.getConfiguration();
        clientConfigurationData.setAuthentication(getClientAuthentication());
        if (configuration.isTlsEnabledWithBroker()) {
            clientConfigurationData.setUseTls(true);
            if (configuration.isBrokerClientTlsEnabledWithKeyStore()) {
                clientConfigurationData.setUseKeyStoreTls(true);
                clientConfigurationData.setTlsTrustStoreType(configuration.getBrokerClientTlsTrustStoreType());
                clientConfigurationData.setTlsTrustStorePath(configuration.getBrokerClientTlsTrustStore());
                clientConfigurationData.setTlsTrustStorePassword(configuration.getBrokerClientTlsTrustStorePassword());
            } else {
                clientConfigurationData.setTlsTrustCertsFilePath(configuration.getBrokerClientTrustCertsFilePath());
                clientConfigurationData.setTlsAllowInsecureConnection(configuration.isTlsAllowInsecureConnection());
            }
        }
        return clientConfigurationData;
    }

    private static int getProtocolVersionToAdvertise(CommandConnect commandConnect) {
        return Math.min(commandConnect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long newRequestId() {
        return this.requestIdGenerator.getAndIncrement();
    }

    public Authentication getClientAuthentication() {
        return this.service.getProxyClientAuthenticationPlugin();
    }

    protected boolean isHandshakeCompleted() {
        return this.state != State.Init;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    public boolean hasHAProxyMessage() {
        return this.haProxyMessage != null;
    }

    public HAProxyMessage getHAProxyMessage() {
        return this.haProxyMessage;
    }

    private boolean isBrokerActive(String str) {
        for (ServiceLookupData serviceLookupData : getAvailableBrokers()) {
            if (matchesHostAndPort("pulsar://", serviceLookupData.getPulsarServiceUrl(), str) || matchesHostAndPort("pulsar+ssl://", serviceLookupData.getPulsarServiceUrlTls(), str)) {
                return true;
            }
        }
        return false;
    }

    private List<? extends ServiceLookupData> getAvailableBrokers() {
        if (this.service.getDiscoveryProvider() == null) {
            LOG.warn("Unable to retrieve active brokers. service.getDiscoveryProvider() is null.zookeeperServers and configurationStoreServers must be configured in proxy configuration when checkActiveBrokers is enabled.");
            return Collections.emptyList();
        }
        try {
            return this.service.getDiscoveryProvider().getAvailableBrokers();
        } catch (PulsarServerException e) {
            LOG.error("Unable to get available brokers", e);
            return Collections.emptyList();
        }
    }

    static boolean matchesHostAndPort(String str, String str2, String str3) {
        return str2 != null && str2.length() == str.length() + str3.length() && str2.startsWith(str) && str2.startsWith(str3, str.length());
    }

    public DirectProxyHandler getDirectProxyHandler() {
        return this.directProxyHandler;
    }
}
