package org.eclipse.hono.adapter.amqp;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.util.EndpointType;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapter.class */
public final class VertxBasedAmqpProtocolAdapter extends AbstractProtocolAdapterBase<ProtocolAdapterProperties> {
    private static final Logger LOG = LoggerFactory.getLogger(VertxBasedAmqpProtocolAdapter.class);
    private static final int DEFAULT_MAX_FRAME_SIZE = 32768;
    private static final int DEFAULT_MAX_SESSION_WINDOW = 3276800;
    private static final int DEFAULT_INSECURE_PORT = 4040;
    private static final int DEFAULT_SECURE_PORT = 4041;
    private ProtonServer secureServer;
    private ProtonServer insecureServer;
    private AtomicBoolean secureListening = new AtomicBoolean(false);
    private ProtonSaslAuthenticatorFactory authenticatorFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.hono.adapter.amqp.VertxBasedAmqpProtocolAdapter$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$util$EndpointType = new int[EndpointType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.TELEMETRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    protected String getTypeName() {
        return "hono-amqp";
    }

    protected void doStart(Future<Void> future) {
        checkPortConfiguration().compose(r8 -> {
            if (this.authenticatorFactory == null && ((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
                this.authenticatorFactory = new AmqpAdapterSaslAuthenticatorFactory(getTenantServiceClient(), getCredentialsServiceClient(), (ProtocolAdapterProperties) getConfig());
            }
            return Future.succeededFuture();
        }).compose(obj -> {
            return CompositeFuture.all(bindSecureServer(), bindInsecureServer());
        }).compose(compositeFuture -> {
            future.complete();
        }, future);
    }

    protected void doStop(Future<Void> future) {
        CompositeFuture.all(stopSecureServer(), stopInsecureServer()).compose(compositeFuture -> {
            future.complete();
        }, future);
    }

    private Future<Void> stopInsecureServer() {
        Future<Void> future = Future.future();
        if (this.insecureServer != null) {
            LOG.info("Shutting down insecure server");
            this.insecureServer.close(future.completer());
        } else {
            future.complete();
        }
        return future;
    }

    private Future<Void> stopSecureServer() {
        Future<Void> future = Future.future();
        if (this.secureServer != null) {
            LOG.info("Shutting down secure server");
            this.secureListening.compareAndSet(Boolean.TRUE.booleanValue(), Boolean.FALSE.booleanValue());
            this.secureServer.close(future.completer());
        } else {
            future.complete();
        }
        return future;
    }

    private Future<Void> bindInsecureServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        ProtonServerOptions port = new ProtonServerOptions().setHost(((ProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort());
        Future<Void> future = Future.future();
        this.insecureServer = createServer(this.insecureServer, port);
        this.insecureServer.connectHandler(this::connectionRequestHandler).listen(asyncResult -> {
            if (!asyncResult.succeeded()) {
                future.fail(asyncResult.cause());
            } else {
                LOG.info("insecure amqp server listening on [{}:{}]", ((ProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress(), Integer.valueOf(getActualInsecurePort()));
                future.complete();
            }
        });
        return future;
    }

    private Future<Void> bindSecureServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        ProtonServerOptions maxFrameSize = new ProtonServerOptions().setHost(((ProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxFrameSize(DEFAULT_MAX_FRAME_SIZE);
        addTlsKeyCertOptions(maxFrameSize);
        addTlsTrustOptions(maxFrameSize);
        Future<Void> future = Future.future();
        this.secureServer = createServer(this.secureServer, maxFrameSize);
        this.secureServer.connectHandler(this::connectionRequestHandler).listen(asyncResult -> {
            if (!asyncResult.succeeded()) {
                LOG.error("cannot bind to secure port", asyncResult.cause());
                future.fail(asyncResult.cause());
            } else {
                this.secureListening.getAndSet(Boolean.TRUE.booleanValue());
                LOG.info("secure amqp server listening on {}:{}", ((ProtocolAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(getActualPort()));
                future.complete();
            }
        });
        return future;
    }

    private ProtonServer createServer(ProtonServer protonServer, ProtonServerOptions protonServerOptions) {
        ProtonServer create = protonServer != null ? protonServer : ProtonServer.create(this.vertx, protonServerOptions);
        if (((ProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            create.saslAuthenticatorFactory(this.authenticatorFactory);
        } else {
            create.saslAuthenticatorFactory((ProtonSaslAuthenticatorFactory) null);
        }
        return create;
    }

    private void connectionRequestHandler(ProtonConnection protonConnection) {
        LOG.debug("Received connection request from client");
        if (this.secureListening.get()) {
            protonConnection.setContainer(String.format("%s-%s:%d", "secure-server", getBindAddress(), Integer.valueOf(getActualPort())));
        } else {
            protonConnection.setContainer(String.format("%s-%s:%d", "insecure-server", getInsecurePortBindAddress(), Integer.valueOf(getActualInsecurePort())));
        }
        protonConnection.disconnectHandler(protonConnection2 -> {
            LOG.debug("client [container: {}] has disconnected", protonConnection2.getRemoteContainer());
        });
        protonConnection.closeHandler(asyncResult -> {
            handleRemoteConnectionClose(protonConnection, asyncResult);
        });
        protonConnection.sessionOpenHandler(protonSession -> {
            HonoProtonHelper.setDefaultCloseHandler(protonSession);
            handleSessionOpen(protonConnection, protonSession);
        });
        protonConnection.openHandler(asyncResult2 -> {
            ProtonConnection protonConnection3 = (ProtonConnection) asyncResult2.result();
            protonConnection3.setContainer(getTypeName());
            protonConnection3.open();
        });
        protonConnection.receiverOpenHandler(protonReceiver -> {
            HonoProtonHelper.setDefaultCloseHandler(protonReceiver);
            handleRemoteReceiverOpen(protonReceiver, protonConnection);
        });
        protonConnection.senderOpenHandler(protonSender -> {
            HonoProtonHelper.setDefaultCloseHandler(protonSender);
            LOG.debug("client [container: {}] wants to open a link [address: {}] for receiving messages", protonConnection.getRemoteContainer(), protonSender.getRemoteSource());
            protonSender.setCondition(ProtonHelper.condition(AmqpError.NOT_ALLOWED, "this adapter only forwards message to downstream applications"));
            protonSender.close();
        });
    }

    protected void setInsecureAmqpServer(ProtonServer protonServer) {
        Objects.requireNonNull(protonServer);
        if (protonServer.actualPort() > 0) {
            throw new IllegalArgumentException("AMQP Server should not be running");
        }
        this.insecureServer = protonServer;
    }

    protected void setSaslAuthenticatorFactory(ProtonSaslAuthenticatorFactory protonSaslAuthenticatorFactory) {
        this.authenticatorFactory = (ProtonSaslAuthenticatorFactory) Objects.requireNonNull(protonSaslAuthenticatorFactory, "authFactory must not be null");
    }

    private void handleSessionOpen(ProtonConnection protonConnection, ProtonSession protonSession) {
        LOG.debug("opening new session with client [container: {}]", protonConnection.getRemoteContainer());
        protonSession.setIncomingCapacity(DEFAULT_MAX_SESSION_WINDOW);
        protonSession.open();
    }

    private void handleRemoteConnectionClose(ProtonConnection protonConnection, AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            LOG.debug("client [container: {}] closed connection", protonConnection.getRemoteContainer());
        } else {
            LOG.debug("client [container: {}] closed connection with error", protonConnection.getRemoteContainer(), asyncResult.cause());
        }
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.close();
        protonConnection.disconnect();
    }

    protected void handleRemoteReceiverOpen(ProtonReceiver protonReceiver, ProtonConnection protonConnection) {
        if (protonReceiver.getRemoteTarget() != null && protonReceiver.getRemoteTarget().getAddress() != null) {
            if (!protonReceiver.getRemoteTarget().getAddress().isEmpty()) {
                LOG.debug("Closing link due to the present of Target [address : {}]", protonReceiver.getRemoteTarget().getAddress());
            }
            protonReceiver.setCondition(AmqpContext.getErrorCondition(new ClientErrorException(400, "This adapter does not accept a target address on receiver links")));
            protonReceiver.close();
            return;
        }
        Device device = (Device) protonConnection.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class);
        LOG.debug("Established receiver link at [address: {}]", protonReceiver.getRemoteTarget() != null ? protonReceiver.getRemoteTarget().getAddress() : null);
        protonReceiver.setTarget((Target) null);
        protonReceiver.setQoS(protonReceiver.getRemoteQoS());
        if (ProtonQoS.AT_LEAST_ONCE.equals(protonReceiver.getRemoteQoS())) {
            protonReceiver.setAutoAccept(false);
        }
        protonReceiver.handler((protonDelivery, message) -> {
            validateEndpoint(message.getAddress(), protonDelivery).compose(resourceIdentifier -> {
                return validateAddress(resourceIdentifier, device);
            }).compose(resourceIdentifier2 -> {
                message.setAddress(resourceIdentifier2.toString());
                uploadMessage(new AmqpContext(protonDelivery, message, device));
                return Future.succeededFuture();
            }).recover(th -> {
                MessageHelper.rejected(protonDelivery, AmqpContext.getErrorCondition(th));
                return Future.failedFuture(th);
            });
        });
        HonoProtonHelper.setCloseHandler(protonReceiver, asyncResult -> {
            onLinkDetach(protonReceiver);
        });
        protonReceiver.open();
    }

    protected void uploadMessage(AmqpContext amqpContext) {
        Future future = Future.future();
        String messageContentType = amqpContext.getMessageContentType();
        if (isPayloadOfIndicatedType(amqpContext.getMessagePayload(), messageContentType)) {
            future.complete();
        } else {
            future.fail(new ClientErrorException(400, String.format("Content-Type: [%s] does not match payload", messageContentType)));
        }
        future.compose(r7 -> {
            switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.fromString(amqpContext.getEndpoint()).ordinal()]) {
                case 1:
                    LOG.trace("Received request to upload telemetry data to endpoint [with name: {}]", amqpContext.getEndpoint());
                    return doUploadMessage(amqpContext, getTelemetrySender(amqpContext.getTenantId()));
                case 2:
                    LOG.trace("Received request to upload events to endpoint [with name: {}]", amqpContext.getEndpoint());
                    return doUploadMessage(amqpContext, getEventSender(amqpContext.getTenantId()));
                default:
                    return Future.failedFuture(new ClientErrorException(400, "unknown endpoint"));
            }
        }).recover(th -> {
            if (!amqpContext.isRemotelySettled()) {
                amqpContext.handleFailure(th);
            }
            return Future.failedFuture(th);
        });
    }

    private Future<Void> doUploadMessage(AmqpContext amqpContext, Future<MessageSender> future) {
        Future registrationAssertion = getRegistrationAssertion(amqpContext.getTenantId(), amqpContext.getDeviceId(), amqpContext.getAuthenticatedDevice());
        Future tenantConfiguration = getTenantConfiguration(amqpContext.getTenantId());
        return CompositeFuture.all(tenantConfiguration, registrationAssertion, future).compose(compositeFuture -> {
            if (!((TenantObject) tenantConfiguration.result()).isAdapterEnabled(getTypeName())) {
                return Future.failedFuture(new ClientErrorException(403, String.format("This adapter is not enabled for tenant [tenantId: %s].", amqpContext.getTenantId())));
            }
            MessageSender messageSender = (MessageSender) future.result();
            Message newMessage = newMessage(amqpContext.getResourceIdentifier(), messageSender.isRegistrationAssertionRequired(), amqpContext.getEndpoint(), amqpContext.getMessageContentType(), amqpContext.getMessagePayload(), (JsonObject) registrationAssertion.result(), null);
            return amqpContext.isRemotelySettled() ? messageSender.send(newMessage) : messageSender.sendAndWaitForOutcome(newMessage);
        }).compose(protonDelivery -> {
            LOG.trace("Successfully process message for Device [deviceId: {}] with Tenant [tenantId: {}]", amqpContext.getDeviceId(), amqpContext.getTenantId());
            if (amqpContext.isRemotelySettled()) {
                amqpContext.accept();
            } else {
                amqpContext.updateDelivery(protonDelivery);
            }
            return Future.succeededFuture();
        }).recover(th -> {
            LOG.debug("Cannot process message for Device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{amqpContext.getTenantId(), amqpContext.getDeviceId(), amqpContext.getEndpoint(), th});
            return Future.failedFuture(th);
        });
    }

    private void onLinkDetach(ProtonReceiver protonReceiver) {
        LOG.debug("closing link [{}]", protonReceiver.getName());
        protonReceiver.close();
    }

    Future<ResourceIdentifier> validateEndpoint(String str, ProtonDelivery protonDelivery) {
        if (str == null || str.isEmpty()) {
            return Future.failedFuture(new ClientErrorException(400, "Message address cannot be null or empty"));
        }
        Future<ResourceIdentifier> future = Future.future();
        ResourceIdentifier fromString = ResourceIdentifier.fromString(str);
        switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$util$EndpointType[EndpointType.fromString(fromString.getEndpoint()).ordinal()]) {
            case 1:
                future.complete(fromString);
                break;
            case 2:
                if (!protonDelivery.remotelySettled()) {
                    future.complete(fromString);
                    break;
                } else {
                    future.fail(new ClientErrorException(400, "The Event endpoint only supports unsettled delivery for messages"));
                    break;
                }
            default:
                LOG.error("Endpoint with [name: {}] is not supported by this adapter ", fromString.getEndpoint());
                future.fail(new ClientErrorException(400, "unsupported endpoint"));
                break;
        }
        return future;
    }

    private Future<ResourceIdentifier> validateAddress(ResourceIdentifier resourceIdentifier, Device device) {
        Future<ResourceIdentifier> future = Future.future();
        if (device == null) {
            if (resourceIdentifier.getTenantId() == null || resourceIdentifier.getResourceId() == null) {
                throw new ClientErrorException(400, "Invalid address for unauthenticated devices");
            }
            future.complete(resourceIdentifier);
        } else if (resourceIdentifier.getTenantId() != null && resourceIdentifier.getResourceId() == null) {
            future.fail(new ClientErrorException(400, "address of authenticated message must not contain tenant ID only"));
        } else if (resourceIdentifier.getTenantId() == null && resourceIdentifier.getResourceId() == null) {
            future.complete(ResourceIdentifier.from(resourceIdentifier.getEndpoint(), device.getTenantId(), device.getDeviceId()));
        } else {
            future.complete(resourceIdentifier);
        }
        return future;
    }

    public int getPortDefaultValue() {
        return DEFAULT_SECURE_PORT;
    }

    public int getInsecurePortDefaultValue() {
        return DEFAULT_INSECURE_PORT;
    }

    protected int getActualPort() {
        if (this.secureServer != null) {
            return this.secureServer.actualPort();
        }
        return -1;
    }

    protected int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }
}
