package org.eclipse.hono.registration.impl;

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.Objects;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.registration.RegistrationMessageFilter;
import org.eclipse.hono.server.BaseEndpoint;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RegistrationConstants;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
@Qualifier("registration")
/* loaded from: input_file:org/eclipse/hono/registration/impl/RegistrationEndpoint.class */
public final class RegistrationEndpoint extends BaseEndpoint {
    @Autowired
    public RegistrationEndpoint(Vertx vertx) {
        super((Vertx) Objects.requireNonNull(vertx));
    }

    @Override // org.eclipse.hono.server.Endpoint
    public String getName() {
        return "registration";
    }

    @Override // org.eclipse.hono.server.BaseEndpoint, org.eclipse.hono.server.Endpoint
    public void onLinkAttach(ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier) {
        if (!ProtonQoS.AT_MOST_ONCE.equals(protonReceiver.getRemoteQoS())) {
            this.logger.debug("establishing link for receiving registration messages from client [{}]", MessageHelper.getLinkName(protonReceiver));
            protonReceiver.setQoS(ProtonQoS.AT_LEAST_ONCE).setAutoAccept(true).setPrefetch(20).handler((protonDelivery, message) -> {
                if (!RegistrationMessageFilter.verify(resourceIdentifier, message)) {
                    MessageHelper.rejected(protonDelivery, AmqpError.DECODE_ERROR.toString(), "malformed registration message");
                    onLinkDetach(protonReceiver, ProtonHelper.condition(AmqpError.DECODE_ERROR.toString(), "invalid message received"));
                } else {
                    try {
                        processRequest(message);
                    } catch (DecodeException e) {
                        MessageHelper.rejected(protonDelivery, AmqpError.DECODE_ERROR.toString(), "malformed payload");
                    }
                }
            }).closeHandler(asyncResult -> {
                onLinkDetach((ProtonReceiver) asyncResult.result());
            }).open();
        } else {
            this.logger.debug("client wants to use AT MOST ONCE delivery mode for registration endpoint, this is not supported.");
            protonReceiver.setCondition(ProtonHelper.condition(AmqpError.PRECONDITION_FAILED.toString(), "endpoint requires AT_LEAST_ONCE QoS"));
            protonReceiver.close();
        }
    }

    @Override // org.eclipse.hono.server.BaseEndpoint, org.eclipse.hono.server.Endpoint
    public void onLinkAttach(ProtonSender protonSender, ResourceIdentifier resourceIdentifier) {
        if (resourceIdentifier.getResourceId() == null) {
            this.logger.debug("link target provided in client's link ATTACH does not match pattern \"registration/<tenant>/<reply-address>\"");
            protonSender.setCondition(ProtonHelper.condition(AmqpError.INVALID_FIELD.toString(), "link target must have the following format registration/<tenant>/<reply-address>"));
            protonSender.close();
        } else {
            this.logger.debug("establishing sender link with client [{}]", MessageHelper.getLinkName(protonSender));
            MessageConsumer consumer = this.vertx.eventBus().consumer(resourceIdentifier.toString(), message -> {
                this.logger.trace("forwarding reply to client: {}", message.body());
                protonSender.send(RegistrationConstants.getAmqpReply(message));
            });
            protonSender.closeHandler(asyncResult -> {
                consumer.unregister();
                ((ProtonSender) asyncResult.result()).close();
                this.logger.debug("receiver closed link [{}], removing associated event bus consumer [{}]", MessageHelper.getLinkName(protonSender), consumer.address());
            });
            protonSender.setQoS(ProtonQoS.AT_LEAST_ONCE).open();
        }
    }

    private void onLinkDetach(ProtonReceiver protonReceiver) {
        onLinkDetach(protonReceiver, (ErrorCondition) null);
    }

    private void onLinkDetach(ProtonReceiver protonReceiver, ErrorCondition errorCondition) {
        this.logger.debug("closing receiver for client [{}]", MessageHelper.getLinkName(protonReceiver));
        protonReceiver.close();
    }

    private void processRequest(Message message) {
        this.vertx.eventBus().send("registration.in", RegistrationConstants.getRegistrationMsg(message), asyncResult -> {
            JsonObject reply;
            if (asyncResult.succeeded()) {
                reply = (JsonObject) ((io.vertx.core.eventbus.Message) asyncResult.result()).body();
            } else {
                this.logger.debug("failed to process request [msg ID: {}] due to {}", message.getMessageId(), asyncResult.cause());
                reply = RegistrationConstants.getReply(500, MessageHelper.getTenantIdAnnotation(message), MessageHelper.getDeviceIdAnnotation(message), (JsonObject) null);
            }
            addHeadersToResponse(message, reply);
            this.vertx.eventBus().send(message.getReplyTo(), reply);
        });
    }

    private void addHeadersToResponse(Message message, JsonObject jsonObject) {
        boolean xOptAppCorrelationId = MessageHelper.getXOptAppCorrelationId(message);
        this.logger.debug("registration request [{}] uses application specific correlation ID: {}", message.getMessageId(), Boolean.valueOf(xOptAppCorrelationId));
        if (xOptAppCorrelationId) {
            jsonObject.put("x-opt-app-correlation-id", Boolean.valueOf(xOptAppCorrelationId));
        }
        jsonObject.put("correlation-id", MessageHelper.encodeIdToJson(getCorrelationId(message)));
    }

    private Object getCorrelationId(Message message) {
        return message.getCorrelationId() != null ? message.getCorrelationId() : message.getMessageId();
    }
}
