package org.eclipse.hono.service.amqp;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.Objects;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.util.AmqpErrorException;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/service/amqp/RequestResponseEndpoint.class */
public abstract class RequestResponseEndpoint<T extends ServiceConfigProperties> extends AbstractAmqpEndpoint<T> {
    private static final int REQUEST_RESPONSE_ENDPOINT_DEFAULT_CREDITS = 20;
    private int receiverLinkCredit;
    private AuthorizationService authorizationService;

    /* JADX INFO: Access modifiers changed from: protected */
    public RequestResponseEndpoint(Vertx vertx) {
        super((Vertx) Objects.requireNonNull(vertx));
        this.receiverLinkCredit = REQUEST_RESPONSE_ENDPOINT_DEFAULT_CREDITS;
        this.authorizationService = new ClaimsBasedAuthorizationService();
    }

    public abstract void processRequest(Message message, ResourceIdentifier resourceIdentifier, HonoUser honoUser);

    protected abstract Message getAmqpReply(io.vertx.core.eventbus.Message<JsonObject> message);

    public final int getReceiverLinkCredit() {
        return this.receiverLinkCredit;
    }

    public final void setReceiverLinkCredit(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("receiver link credit must be at least 1");
        }
        this.receiverLinkCredit = i;
    }

    public final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Autowired(required = false)
    public final void setAuthorizationService(AuthorizationService authorizationService) {
        this.authorizationService = authorizationService;
    }

    @Override // org.eclipse.hono.service.amqp.AbstractAmqpEndpoint, org.eclipse.hono.service.amqp.AmqpEndpoint
    public final void onLinkAttach(ProtonConnection protonConnection, ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier) {
        if (!ProtonQoS.AT_MOST_ONCE.equals(protonReceiver.getRemoteQoS())) {
            this.logger.debug("establishing link for receiving messages from client [{}]", MessageHelper.getLinkName(protonReceiver));
            protonReceiver.setQoS(ProtonQoS.AT_LEAST_ONCE).setAutoAccept(true).setPrefetch(this.receiverLinkCredit).handler((protonDelivery, message) -> {
                handleMessage(protonConnection, protonReceiver, resourceIdentifier, protonDelivery, message);
            }).closeHandler(asyncResult -> {
                onLinkDetach((ProtonReceiver) asyncResult.result());
            }).open();
        } else {
            this.logger.debug("client wants to use AT MOST ONCE delivery mode for {} endpoint, this is not supported.", getName());
            protonReceiver.setCondition(ProtonHelper.condition(AmqpError.PRECONDITION_FAILED.toString(), "endpoint requires AT_LEAST_ONCE QoS"));
            protonReceiver.close();
        }
    }

    protected final void handleMessage(ProtonConnection protonConnection, ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier, ProtonDelivery protonDelivery, Message message) {
        Future future = Future.future();
        future.setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                ProtonHelper.accepted(protonDelivery, true);
            } else if (asyncResult.cause() instanceof AmqpErrorException) {
                MessageHelper.rejected(protonDelivery, asyncResult.cause().asErrorCondition());
            } else {
                this.logger.debug("error processing request [resource: {}, op: {}]: {}", new Object[]{resourceIdentifier, message.getSubject(), asyncResult.cause().getMessage()});
                MessageHelper.rejected(protonDelivery, ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "internal error"));
            }
        });
        if (!passesFormalVerification(resourceIdentifier, message)) {
            future.fail(new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
        } else {
            HonoUser clientPrincipal = Constants.getClientPrincipal(protonConnection);
            isAuthorized(clientPrincipal, resourceIdentifier, message.getSubject()).compose(bool -> {
                Logger logger = this.logger;
                Object[] objArr = new Object[4];
                objArr[0] = clientPrincipal.getName();
                objArr[1] = bool.booleanValue() ? "" : "not ";
                objArr[2] = resourceIdentifier;
                objArr[3] = message.getSubject();
                logger.debug("client [{}] is {}authorized to {}:{}", objArr);
                if (!bool.booleanValue()) {
                    future.fail(new AmqpErrorException(AmqpError.UNAUTHORIZED_ACCESS, "unauthorized"));
                    return;
                }
                try {
                    processRequest(message, resourceIdentifier, Constants.getClientPrincipal(protonConnection));
                    future.complete();
                } catch (DecodeException e) {
                    future.fail(new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
                }
            }, future);
        }
    }

    protected Future<Boolean> isAuthorized(HonoUser honoUser, ResourceIdentifier resourceIdentifier, String str) {
        return getAuthorizationService().isAuthorized(honoUser, resourceIdentifier, str);
    }

    @Override // org.eclipse.hono.service.amqp.AbstractAmqpEndpoint, org.eclipse.hono.service.amqp.AmqpEndpoint
    public final void onLinkAttach(ProtonConnection protonConnection, ProtonSender protonSender, ResourceIdentifier resourceIdentifier) {
        if (resourceIdentifier.getResourceId() == null) {
            this.logger.debug("link target provided in client's link ATTACH must not be null, but must match pattern \"{}/<tenant>/<reply-address>\" instead", getName());
            protonSender.setCondition(ProtonHelper.condition(AmqpError.INVALID_FIELD.toString(), String.format("link target must not be null but must have the following format %s/<tenant>/<reply-address>", getName())));
            protonSender.close();
        } else {
            this.logger.debug("establishing sender link with client [{}]", MessageHelper.getLinkName(protonSender));
            MessageConsumer consumer = this.vertx.eventBus().consumer(resourceIdentifier.toString(), message -> {
                this.logger.trace("forwarding reply to client: {}", message.body());
                protonSender.send(getAmqpReply(message));
            });
            protonSender.closeHandler(asyncResult -> {
                consumer.unregister();
                ((ProtonSender) asyncResult.result()).close();
                this.logger.debug("receiver closed link [{}], removing associated event bus consumer [{}]", MessageHelper.getLinkName(protonSender), consumer.address());
            });
            protonSender.setQoS(ProtonQoS.AT_LEAST_ONCE).open();
        }
    }
}
