package org.eclipse.hono.messaging;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonSession;
import java.util.UUID;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Source;
import org.eclipse.hono.auth.Activity;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.service.amqp.AmqpServiceBase;
import org.eclipse.hono.service.amqp.Endpoint;
import org.eclipse.hono.telemetry.TelemetryConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
/* loaded from: input_file:org/eclipse/hono/messaging/HonoMessaging.class */
public final class HonoMessaging extends AmqpServiceBase<HonoMessagingConfigProperties> {
    protected Future<Void> preStartServers() {
        checkStandardEndpointsAreRegistered();
        logStartupMessage();
        return Future.succeededFuture();
    }

    private void checkStandardEndpointsAreRegistered() {
        if (getEndpoint(TelemetryConstants.TELEMETRY_ENDPOINT) == null) {
            this.LOG.warn("no Telemetry endpoint has been configured, Hono server will not support Telemetry API");
        }
    }

    private void logStartupMessage() {
        if (this.LOG.isWarnEnabled()) {
            this.LOG.warn("Hono server does not yet support limiting the incoming message size via the maxPayloadSize property");
        }
    }

    private void setRemoteConnectionOpenHandler(ProtonConnection protonConnection) {
        protonConnection.sessionOpenHandler(protonSession -> {
            handleSessionOpen(protonConnection, protonSession);
        });
        protonConnection.receiverOpenHandler(protonReceiver -> {
            handleReceiverOpen(protonConnection, protonReceiver);
        });
        protonConnection.senderOpenHandler(protonSender -> {
            handleSenderOpen(protonConnection, protonSender);
        });
        protonConnection.disconnectHandler(this::handleRemoteDisconnect);
        protonConnection.closeHandler(asyncResult -> {
            handleRemoteConnectionClose(protonConnection, asyncResult);
        });
        protonConnection.openHandler(asyncResult2 -> {
            this.LOG.info("client [container: {}, user: {}] connected", protonConnection.getRemoteContainer(), Constants.getClientPrincipal(protonConnection).getName());
            protonConnection.open();
            Constants.setConnectionId(protonConnection, UUID.randomUUID().toString());
        });
    }

    protected void onRemoteConnectionOpen(ProtonConnection protonConnection) {
        protonConnection.setContainer(String.format("Hono-%s:%d", getBindAddress(), Integer.valueOf(getPort())));
        setRemoteConnectionOpenHandler(protonConnection);
    }

    protected void onRemoteConnectionOpenInsecurePort(ProtonConnection protonConnection) {
        protonConnection.setContainer(String.format("Hono-%s:%d", getInsecurePortBindAddress(), Integer.valueOf(getInsecurePort())));
        setRemoteConnectionOpenHandler(protonConnection);
    }

    private void handleSessionOpen(ProtonConnection protonConnection, ProtonSession protonSession) {
        this.LOG.info("opening new session with client [{}]", protonConnection.getRemoteContainer());
        protonSession.closeHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                ((ProtonSession) asyncResult.result()).close();
            }
        }).open();
    }

    private void handleRemoteConnectionClose(ProtonConnection protonConnection, AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            this.LOG.info("client [{}] closed connection", protonConnection.getRemoteContainer());
        } else {
            this.LOG.info("client [{}] closed connection with error", protonConnection.getRemoteContainer(), asyncResult.cause());
        }
        protonConnection.close();
        protonConnection.disconnect();
        publishConnectionClosedEvent(protonConnection);
    }

    private void handleRemoteDisconnect(ProtonConnection protonConnection) {
        this.LOG.info("client [{}] disconnected", protonConnection.getRemoteContainer());
        protonConnection.disconnect();
        publishConnectionClosedEvent(protonConnection);
    }

    void handleReceiverOpen(ProtonConnection protonConnection, ProtonReceiver protonReceiver) {
        if (protonReceiver.getRemoteTarget().getAddress() == null) {
            this.LOG.debug("client [{}] wants to open an anonymous link for sending messages to arbitrary addresses, closing link", protonConnection.getRemoteContainer());
            protonReceiver.setCondition(ProtonHelper.condition(AmqpError.NOT_FOUND.toString(), "anonymous relay not supported")).close();
            return;
        }
        this.LOG.debug("client [{}] wants to open a link for sending messages [address: {}]", protonConnection.getRemoteContainer(), protonReceiver.getRemoteTarget());
        try {
            ResourceIdentifier resourceIdentifier = getResourceIdentifier(protonReceiver.getRemoteTarget().getAddress());
            Endpoint endpoint = getEndpoint(resourceIdentifier);
            if (endpoint == null) {
                handleUnknownEndpoint(protonConnection, protonReceiver, resourceIdentifier);
            } else {
                HonoUser clientPrincipal = Constants.getClientPrincipal(protonConnection);
                getAuthorizationService().isAuthorized(clientPrincipal, resourceIdentifier, Activity.WRITE).setHandler(asyncResult -> {
                    if (!asyncResult.succeeded() || !((Boolean) asyncResult.result()).booleanValue()) {
                        this.LOG.debug("subject [{}] is not authorized to WRITE to [{}]", clientPrincipal.getName(), resourceIdentifier);
                        protonReceiver.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS.toString(), "unauthorized")).close();
                    } else {
                        Constants.copyProperties(protonConnection, protonReceiver);
                        protonReceiver.setTarget(protonReceiver.getRemoteTarget());
                        endpoint.onLinkAttach(protonConnection, protonReceiver, resourceIdentifier);
                    }
                });
            }
        } catch (IllegalArgumentException e) {
            this.LOG.debug("client has provided invalid resource identifier as target address", e);
            protonReceiver.close();
        }
    }

    private void publishConnectionClosedEvent(ProtonConnection protonConnection) {
        String str = (String) protonConnection.attachments().get("CONNECTION_ID", String.class);
        if (str != null) {
            this.vertx.eventBus().publish("hono.connection.closed", str);
        }
    }

    void handleSenderOpen(ProtonConnection protonConnection, ProtonSender protonSender) {
        Source remoteSource = protonSender.getRemoteSource();
        this.LOG.debug("client [{}] wants to open a link for receiving messages [address: {}]", protonConnection.getRemoteContainer(), remoteSource);
        try {
            ResourceIdentifier resourceIdentifier = getResourceIdentifier(remoteSource.getAddress());
            Endpoint endpoint = getEndpoint(resourceIdentifier);
            if (endpoint == null) {
                handleUnknownEndpoint(protonConnection, protonSender, resourceIdentifier);
            } else {
                HonoUser clientPrincipal = Constants.getClientPrincipal(protonConnection);
                getAuthorizationService().isAuthorized(clientPrincipal, resourceIdentifier, Activity.READ).setHandler(asyncResult -> {
                    if (!asyncResult.succeeded() || !((Boolean) asyncResult.result()).booleanValue()) {
                        this.LOG.debug("subject [{}] is not authorized to READ from [{}]", clientPrincipal.getName(), resourceIdentifier);
                        protonSender.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS.toString(), "unauthorized")).close();
                    } else {
                        Constants.copyProperties(protonConnection, protonSender);
                        protonSender.setSource(protonSender.getRemoteSource());
                        endpoint.onLinkAttach(protonConnection, protonSender, resourceIdentifier);
                    }
                });
            }
        } catch (IllegalArgumentException e) {
            this.LOG.debug("client has provided invalid resource identifier as target address", e);
            protonSender.close();
        }
    }
}
