package org.eclipse.hono.server;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.Objects;
import java.util.UUID;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.authorization.AuthorizationConstants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;

/* loaded from: input_file:org/eclipse/hono/server/MessageForwardingEndpoint.class */
public abstract class MessageForwardingEndpoint extends BaseEndpoint {
    private DownstreamAdapter downstreamAdapter;

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageForwardingEndpoint(Vertx vertx) {
        super((Vertx) Objects.requireNonNull(vertx));
    }

    @Override // org.eclipse.hono.server.BaseEndpoint
    protected final void doStart(Future<Void> future) {
        if (this.downstreamAdapter == null) {
            future.fail("no downstream adapter configured on Telemetry endpoint");
        } else {
            this.downstreamAdapter.start(future);
        }
    }

    @Override // org.eclipse.hono.server.BaseEndpoint
    protected final void doStop(Future<Void> future) {
        if (this.downstreamAdapter == null) {
            future.complete();
        } else {
            this.downstreamAdapter.stop(future);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void setDownstreamAdapter(DownstreamAdapter downstreamAdapter) {
        this.downstreamAdapter = (DownstreamAdapter) Objects.requireNonNull(downstreamAdapter);
    }

    @Override // org.eclipse.hono.server.BaseEndpoint, org.eclipse.hono.server.Endpoint
    public final void onLinkAttach(ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier) {
        String uuid = UUID.randomUUID().toString();
        UpstreamReceiver newUpstreamReceiver = UpstreamReceiver.newUpstreamReceiver(uuid, protonReceiver, getEndpointQos());
        this.downstreamAdapter.onClientAttach(newUpstreamReceiver, asyncResult -> {
            if (!asyncResult.succeeded()) {
                newUpstreamReceiver.close(ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, "no consumer available for target"));
            } else {
                protonReceiver.closeHandler(asyncResult -> {
                    onLinkDetach(newUpstreamReceiver);
                    this.downstreamAdapter.onClientDetach(newUpstreamReceiver);
                }).handler((protonDelivery, message) -> {
                    if (passesFormalVerification(resourceIdentifier, message)) {
                        forwardMessage(newUpstreamReceiver, protonDelivery, message);
                    } else {
                        MessageHelper.rejected(protonDelivery, AmqpError.DECODE_ERROR.toString(), "malformed message");
                        onLinkDetach(newUpstreamReceiver, ProtonHelper.condition(AmqpError.DECODE_ERROR.toString(), "invalid message received"));
                    }
                }).open();
                this.logger.debug("accepted link from telemetry client [{}]", uuid);
            }
        });
    }

    private void forwardMessage(UpstreamReceiver upstreamReceiver, ProtonDelivery protonDelivery, Message message) {
        ResourceIdentifier fromString = ResourceIdentifier.fromString((String) MessageHelper.getAnnotation(message, AuthorizationConstants.RESOURCE_FIELD, String.class));
        checkDeviceExists(fromString, bool -> {
            if (bool.booleanValue()) {
                this.downstreamAdapter.processMessage(upstreamReceiver, protonDelivery, message);
                return;
            }
            this.logger.debug("device {}/{} does not exist, closing link", fromString.getTenantId(), fromString.getResourceId());
            MessageHelper.rejected(protonDelivery, AmqpError.PRECONDITION_FAILED.toString(), "device does not exist");
            upstreamReceiver.close(ProtonHelper.condition(AmqpError.PRECONDITION_FAILED.toString(), "device does not exist"));
        });
    }

    protected abstract ProtonQoS getEndpointQos();

    protected abstract boolean passesFormalVerification(ResourceIdentifier resourceIdentifier, Message message);
}
