package org.eclipse.hono.messaging;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.messaging.HonoMessagingConfigProperties;
import org.eclipse.hono.service.amqp.AbstractAmqpEndpoint;
import org.eclipse.hono.service.registration.RegistrationAssertionHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

/* loaded from: input_file:org/eclipse/hono/messaging/MessageForwardingEndpoint.class */
public abstract class MessageForwardingEndpoint<T extends HonoMessagingConfigProperties> extends AbstractAmqpEndpoint<T> {
    private MessagingMetrics metrics;
    private DownstreamAdapter downstreamAdapter;
    private MessageConsumer<String> clientDisconnectListener;
    private RegistrationAssertionHelper registrationAssertionValidator;

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageForwardingEndpoint(Vertx vertx) {
        super((Vertx) Objects.requireNonNull(vertx));
    }

    @Autowired
    @Qualifier("validation")
    public void setRegistrationAssertionValidator(RegistrationAssertionHelper registrationAssertionHelper) {
        this.registrationAssertionValidator = (RegistrationAssertionHelper) Objects.requireNonNull(registrationAssertionHelper);
    }

    @Autowired
    public final void setMetrics(MessagingMetrics messagingMetrics) {
        this.metrics = messagingMetrics;
    }

    protected final void doStart(Future<Void> future) {
        if (this.downstreamAdapter == null) {
            future.fail("no downstream adapter configured on endpoint");
            return;
        }
        if (((HonoMessagingConfigProperties) this.config).isAssertionValidationRequired() && this.registrationAssertionValidator == null) {
            future.fail("no registration assertion validator has been set");
            return;
        }
        if (!((HonoMessagingConfigProperties) this.config).isAssertionValidationRequired()) {
            this.logger.warn("validation of registration assertions is disabled, all clients may publish data on behalf of any device");
        }
        this.clientDisconnectListener = this.vertx.eventBus().consumer("hono.connection.closed", message -> {
            onClientDisconnect(message);
        });
        this.downstreamAdapter.start(future);
    }

    protected final void doStop(Future<Void> future) {
        if (this.downstreamAdapter == null) {
            future.complete();
        } else {
            this.clientDisconnectListener.unregister();
            this.downstreamAdapter.stop(future);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void setDownstreamAdapter(DownstreamAdapter downstreamAdapter) {
        this.downstreamAdapter = (DownstreamAdapter) Objects.requireNonNull(downstreamAdapter);
    }

    private void onClientDisconnect(Message<String> message) {
        this.downstreamAdapter.onClientDisconnect((String) message.body());
    }

    public final void onLinkAttach(ProtonConnection protonConnection, ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier) {
        if (!getEndpointQos().contains(protonReceiver.getRemoteQoS())) {
            this.logger.debug("client [{}] wants to use unsupported delivery mode {} for endpoint [name: {}, QoS: {}], closing link", new Object[]{protonConnection.getRemoteContainer(), protonReceiver.getRemoteQoS(), getName(), getEndpointQos()});
            protonReceiver.setCondition(ErrorConditions.ERROR_UNSUPPORTED_DELIVERY_MODE);
            protonReceiver.close();
        } else {
            protonReceiver.setQoS(protonReceiver.getRemoteQoS());
            protonReceiver.setTarget(protonReceiver.getRemoteTarget());
            UpstreamReceiver newUpstreamReceiver = UpstreamReceiver.newUpstreamReceiver(UUID.randomUUID().toString(), protonReceiver);
            this.downstreamAdapter.onClientAttach(newUpstreamReceiver, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    newUpstreamReceiver.close(ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, "no consumer available for target"));
                    return;
                }
                protonReceiver.closeHandler(asyncResult -> {
                    onLinkDetach(newUpstreamReceiver);
                    this.downstreamAdapter.onClientDetach(newUpstreamReceiver);
                    this.metrics.decrementUpstreamLinks(resourceIdentifier.toString());
                });
                protonReceiver.handler((protonDelivery, message) -> {
                    if (passesFormalVerification(resourceIdentifier, message)) {
                        forwardMessage(newUpstreamReceiver, protonDelivery, message);
                    } else {
                        rejectMessage(protonDelivery, ProtonHelper.condition(AmqpError.DECODE_ERROR, "malformed message"), newUpstreamReceiver);
                    }
                });
                protonReceiver.open();
                this.logger.debug("establishing link with client [{}]", protonConnection.getRemoteContainer());
                this.metrics.incrementUpstreamLinks(resourceIdentifier.toString());
            });
        }
    }

    protected final void onLinkDetach(UpstreamReceiver upstreamReceiver) {
        onLinkDetach(upstreamReceiver, null);
    }

    protected final void onLinkDetach(UpstreamReceiver upstreamReceiver, ErrorCondition errorCondition) {
        if (errorCondition == null) {
            this.logger.debug("closing receiver for client [{}]", upstreamReceiver.getLinkId());
        } else {
            this.logger.debug("closing receiver for client [{}]: {}", upstreamReceiver.getLinkId(), errorCondition.getDescription());
        }
        upstreamReceiver.close(errorCondition);
    }

    final void forwardMessage(UpstreamReceiver upstreamReceiver, ProtonDelivery protonDelivery, org.apache.qpid.proton.message.Message message) {
        if (assertRegistration(MessageHelper.getAndRemoveRegistrationAssertion(message), ResourceIdentifier.fromString((String) MessageHelper.getAnnotation(message, "resource", String.class)))) {
            this.downstreamAdapter.processMessage(upstreamReceiver, protonDelivery, message);
        } else {
            this.logger.debug("failed to validate device registration status");
            rejectMessage(protonDelivery, ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, "device non-existent/disabled"), upstreamReceiver);
        }
    }

    private boolean assertRegistration(String str, ResourceIdentifier resourceIdentifier) {
        if (!((HonoMessagingConfigProperties) this.config).isAssertionValidationRequired()) {
            return true;
        }
        if (str != null) {
            return this.registrationAssertionValidator.isValid(str, resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId());
        }
        this.logger.debug("registration assertion validation failed due to missing token");
        return false;
    }

    private void rejectMessage(ProtonDelivery protonDelivery, ErrorCondition errorCondition, UpstreamReceiver upstreamReceiver) {
        MessageHelper.rejected(protonDelivery, errorCondition);
        upstreamReceiver.replenish(1);
    }

    protected abstract Set<ProtonQoS> getEndpointQos();

    public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
        healthCheckHandler.register(getName() + "-endpoint-downstream-connection", future -> {
            if (this.downstreamAdapter == null) {
                future.complete(Status.KO());
            } else if (this.downstreamAdapter.isConnected()) {
                future.complete(Status.OK());
            } else {
                future.complete(Status.KO());
            }
        });
    }
}
