package org.eclipse.hono.server;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import java.security.Principal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.engine.Record;
import org.eclipse.hono.authorization.AuthorizationConstants;
import org.eclipse.hono.authorization.Permission;
import org.eclipse.hono.telemetry.TelemetryConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/server/HonoServer.class */
public final class HonoServer extends AbstractVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(HonoServer.class);
    private final String authServiceAddress;
    private String bindAddress;
    private int port;
    private boolean singleTenant;
    private boolean networkDebugLoggingEnabled;
    private final int instanceNo;
    private ProtonServer server;
    private Map<String, Endpoint> endpoints;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HonoServer(String str, int i, boolean z) {
        this(str, i, z, 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HonoServer(String str, int i, boolean z, int i2) {
        this.endpoints = new HashMap();
        this.bindAddress = (String) Objects.requireNonNull(str);
        this.port = i;
        this.singleTenant = z;
        this.instanceNo = i2;
        this.authServiceAddress = String.format("%s.%d", AuthorizationConstants.EVENT_BUS_ADDRESS_AUTHORIZATION_IN, Integer.valueOf(i2));
    }

    public void start(Future<Void> future) {
        checkStandardEndpointsAreRegistered();
        if (!startEndpoints()) {
            future.fail("one or more of the registered endpoints failed to start, aborting ...");
        } else {
            this.server = ProtonServer.create(this.vertx, createServerOptions()).saslAuthenticatorFactory(new PlainSaslAuthenticatorFactory(this.vertx)).connectHandler(this::handleRemoteConnectionOpen).listen(this.port, this.bindAddress, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    LOG.error("cannot start up HonoServer", asyncResult.cause());
                    future.fail(asyncResult.cause());
                } else {
                    this.port = ((ProtonServer) asyncResult.result()).actualPort();
                    LOG.info("HonoServer running at [{}:{}]", this.bindAddress, Integer.valueOf(this.port));
                    future.complete();
                }
            });
        }
    }

    private void checkStandardEndpointsAreRegistered() {
        if (!isTelemetryEndpointConfigured()) {
            LOG.warn("no Telemetry endpoint has been configured, Hono server will not support Telemetry API");
        }
        if (isRegistrationEndpointConfigured()) {
            return;
        }
        LOG.warn("no Registration endpoint has been configured, Hono server will not support Registration API");
    }

    private boolean isTelemetryEndpointConfigured() {
        return this.endpoints.containsKey(TelemetryConstants.TELEMETRY_ENDPOINT);
    }

    private boolean isRegistrationEndpointConfigured() {
        return this.endpoints.containsKey("registration");
    }

    private boolean startEndpoints() {
        boolean z = true;
        Iterator<Endpoint> it = this.endpoints.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Endpoint next = it.next();
            LOG.info("starting endpoint [name: {}, class: {}]", next.getName(), next.getClass().getName());
            z &= next.start();
            if (!z) {
                LOG.error("could not start endpoint [name: {}, class: {}]", next.getName(), next.getClass().getName());
                break;
            }
        }
        return z;
    }

    ProtonServerOptions createServerOptions() {
        ProtonServerOptions protonServerOptions = new ProtonServerOptions();
        protonServerOptions.setIdleTimeout(0);
        protonServerOptions.setReceiveBufferSize(32768);
        protonServerOptions.setSendBufferSize(32768);
        protonServerOptions.setLogActivity(this.networkDebugLoggingEnabled);
        return protonServerOptions;
    }

    public void stop(Future<Void> future) {
        if (this.server != null) {
            this.server.close(asyncResult -> {
                LOG.info("HonoServer has been shut down");
                future.complete();
            });
        } else {
            LOG.info("HonoServer has been already shut down");
            future.complete();
        }
    }

    public void addEndpoints(List<Endpoint> list) {
        Objects.requireNonNull(list);
        Iterator<Endpoint> it = list.iterator();
        while (it.hasNext()) {
            addEndpoint(it.next());
        }
    }

    public void addEndpoint(Endpoint endpoint) {
        if (this.endpoints.putIfAbsent(endpoint.getName(), endpoint) != null) {
            LOG.warn("multiple endpoints defined with name [{}]", endpoint.getName());
        } else {
            LOG.debug("registering endpoint [{}]", endpoint.getName());
        }
    }

    public int getPort() {
        return this.server != null ? this.server.actualPort() : this.port;
    }

    public String getBindAddress() {
        return this.bindAddress;
    }

    public void setNetworkDebugLoggingEnabled(boolean z) {
        this.networkDebugLoggingEnabled = z;
    }

    public boolean isSingleTenant() {
        return this.singleTenant;
    }

    void handleRemoteConnectionOpen(ProtonConnection protonConnection) {
        protonConnection.setContainer(String.format("Hono-%s:%d-%d", this.bindAddress, Integer.valueOf(this.server.actualPort()), Integer.valueOf(this.instanceNo)));
        protonConnection.sessionOpenHandler(protonSession -> {
            handleSessionOpen(protonConnection, protonSession);
        });
        protonConnection.receiverOpenHandler(protonReceiver -> {
            handleReceiverOpen(protonConnection, protonReceiver);
        });
        protonConnection.senderOpenHandler(protonSender -> {
            handleSenderOpen(protonConnection, protonSender);
        });
        protonConnection.disconnectHandler(this::handleRemoteDisconnect);
        protonConnection.closeHandler(asyncResult -> {
            handleRemoteConnectionClose(protonConnection, asyncResult);
        });
        protonConnection.openHandler(asyncResult2 -> {
            LOG.info("client [container: {}, user: {}] connected", protonConnection.getRemoteContainer(), getUserFromConnection(protonConnection));
            protonConnection.open();
            protonConnection.attachments().set("CONNECTION_ID", String.class, UUID.randomUUID().toString());
        });
    }

    private void handleSessionOpen(ProtonConnection protonConnection, ProtonSession protonSession) {
        LOG.info("opening new session with client [{}]", protonConnection.getRemoteContainer());
        protonSession.closeHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                ((ProtonSession) asyncResult.result()).close();
            }
        }).open();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void handleRemoteConnectionClose(ProtonConnection protonConnection, AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            LOG.info("client [{}] closed connection", protonConnection.getRemoteContainer());
        } else {
            LOG.info("client [{}] closed connection with error", protonConnection.getRemoteContainer(), asyncResult.cause());
        }
        protonConnection.close();
    }

    private void handleRemoteDisconnect(ProtonConnection protonConnection) {
        LOG.info("client [{}] disconnected", protonConnection.getRemoteContainer());
        protonConnection.disconnect();
        publishConnectionClosedEvent(protonConnection);
    }

    void handleReceiverOpen(ProtonConnection protonConnection, ProtonReceiver protonReceiver) {
        if (protonReceiver.getRemoteTarget().getAddress() == null) {
            LOG.debug("client [{}] wants to open an anonymous link for sending messages to arbitrary addresses, closing link", protonConnection.getRemoteContainer());
            protonReceiver.setCondition(ProtonHelper.condition(AmqpError.NOT_FOUND.toString(), "anonymous relay not supported")).close();
            return;
        }
        LOG.debug("client [{}] wants to open a link for sending messages [address: {}]", protonConnection.getRemoteContainer(), protonReceiver.getRemoteTarget());
        try {
            ResourceIdentifier resourceIdentifier = getResourceIdentifier(protonReceiver.getRemoteTarget().getAddress());
            Endpoint endpoint = getEndpoint(resourceIdentifier);
            if (endpoint == null) {
                handleUnknownEndpoint(protonConnection, protonReceiver, resourceIdentifier);
            } else {
                String userFromConnection = getUserFromConnection(protonConnection);
                checkAuthorizationToAttach(userFromConnection, resourceIdentifier, Permission.WRITE, bool -> {
                    if (!bool.booleanValue()) {
                        protonReceiver.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS.toString(), String.format("subject [%s] is not authorized to WRITE to [%s]", userFromConnection, resourceIdentifier))).close();
                        return;
                    }
                    copyConnectionId(protonConnection.attachments(), protonReceiver.attachments());
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget());
                    endpoint.onLinkAttach(protonReceiver, resourceIdentifier);
                });
            }
        } catch (IllegalArgumentException e) {
            LOG.debug("client has provided invalid resource identifier as target address", e);
            protonReceiver.close();
        }
    }

    private void publishConnectionClosedEvent(ProtonConnection protonConnection) {
        String str = (String) protonConnection.attachments().get("CONNECTION_ID", String.class);
        if (str != null) {
            this.vertx.eventBus().publish("hono.connection.closed", str);
        }
    }

    private static void copyConnectionId(Record record, Record record2) {
        record2.set("CONNECTION_ID", String.class, record.get("CONNECTION_ID", String.class));
    }

    private static String getUserFromConnection(ProtonConnection protonConnection) {
        Principal clientPrincipal = Constants.getClientPrincipal(protonConnection);
        if (clientPrincipal != null) {
            return clientPrincipal.getName();
        }
        LOG.warn("connection from client [{}] is not authenticated properly using SASL, falling back to default subject [{}]", protonConnection.getRemoteContainer(), "hono-client");
        return "hono-client";
    }

    void handleSenderOpen(ProtonConnection protonConnection, ProtonSender protonSender) {
        Source remoteSource = protonSender.getRemoteSource();
        LOG.debug("client [{}] wants to open a link for receiving messages [address: {}]", protonConnection.getRemoteContainer(), remoteSource);
        try {
            ResourceIdentifier resourceIdentifier = getResourceIdentifier(remoteSource.getAddress());
            Endpoint endpoint = getEndpoint(resourceIdentifier);
            if (endpoint == null) {
                handleUnknownEndpoint(protonConnection, protonSender, resourceIdentifier);
            } else {
                String userFromConnection = getUserFromConnection(protonConnection);
                checkAuthorizationToAttach(userFromConnection, resourceIdentifier, Permission.READ, bool -> {
                    if (!bool.booleanValue()) {
                        protonSender.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS.toString(), String.format("subject [%s] is not authorized to READ from [%s]", userFromConnection, resourceIdentifier))).close();
                        return;
                    }
                    copyConnectionId(protonConnection.attachments(), protonSender.attachments());
                    protonSender.setSource(protonSender.getRemoteSource());
                    endpoint.onLinkAttach(protonSender, resourceIdentifier);
                });
            }
        } catch (IllegalArgumentException e) {
            LOG.debug("client has provided invalid resource identifier as target address", e);
            protonSender.close();
        }
    }

    private static void handleUnknownEndpoint(ProtonConnection protonConnection, ProtonLink<?> protonLink, ResourceIdentifier resourceIdentifier) {
        LOG.info("client [{}] wants to establish link for unknown endpoint [address: {}]", protonConnection.getRemoteContainer(), resourceIdentifier);
        protonLink.setCondition(ProtonHelper.condition(AmqpError.NOT_FOUND.toString(), String.format("no endpoint registered for address %s", resourceIdentifier)));
        protonLink.close();
    }

    private Endpoint getEndpoint(ResourceIdentifier resourceIdentifier) {
        return this.endpoints.get(resourceIdentifier.getEndpoint());
    }

    private void checkAuthorizationToAttach(String str, ResourceIdentifier resourceIdentifier, Permission permission, Handler<Boolean> handler) {
        this.vertx.eventBus().send(this.authServiceAddress, AuthorizationConstants.getAuthorizationMsg(str, resourceIdentifier.toString(), permission.toString()), asyncResult -> {
            handler.handle(Boolean.valueOf(asyncResult.succeeded() && AuthorizationConstants.ALLOWED.equals(((Message) asyncResult.result()).body())));
        });
    }

    private ResourceIdentifier getResourceIdentifier(String str) {
        return isSingleTenant() ? ResourceIdentifier.fromStringAssumingDefaultTenant(str) : ResourceIdentifier.fromString(str);
    }

    String getAuthServiceAddress() {
        return this.authServiceAddress;
    }
}
