package org.eclipse.hono.telemetry.impl;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.AmqpMessage;
import org.eclipse.hono.authorization.AuthorizationConstants;
import org.eclipse.hono.server.BaseEndpoint;
import org.eclipse.hono.telemetry.TelemetryConstants;
import org.eclipse.hono.telemetry.TelemetryMessageFilter;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RegistrationConstants;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/telemetry/impl/TelemetryEndpoint.class */
public final class TelemetryEndpoint extends BaseEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(TelemetryEndpoint.class);
    private Map<String, LinkWrapper> activeClients;
    private MessageConsumer<JsonObject> flowControlConsumer;
    private String flowControlAddress;
    private String linkControlAddress;
    private String dataAddress;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/hono/telemetry/impl/TelemetryEndpoint$LinkWrapper.class */
    public static class LinkWrapper {
        private ProtonReceiver link;
        private String id;

        public LinkWrapper(String str, ProtonReceiver protonReceiver) {
            this.id = (String) Objects.requireNonNull(str);
            this.link = (ProtonReceiver) Objects.requireNonNull(protonReceiver);
            this.link.setAutoAccept(false).setQoS(ProtonQoS.AT_MOST_ONCE).setPrefetch(0);
        }

        public void replenish(int i) {
            TelemetryEndpoint.LOG.debug("replenishing client [{}] with {} credits", this.id, Integer.valueOf(i));
            this.link.flow(i);
        }

        public int getCredit() {
            return this.link.getCredit() - this.link.getQueued();
        }

        public void close(ErrorCondition errorCondition) {
            if (errorCondition != null) {
                this.link.setCondition(errorCondition);
            }
            this.link.close();
        }

        public ProtonReceiver getLink() {
            return this.link;
        }

        public String getLinkId() {
            return this.id;
        }

        public String getConnectionId() {
            return Constants.getConnectionId(this.link);
        }
    }

    public TelemetryEndpoint(Vertx vertx, boolean z) {
        this(vertx, z, 0);
    }

    public TelemetryEndpoint(Vertx vertx, boolean z, int i) {
        super((Vertx) Objects.requireNonNull(vertx), z, i);
        this.activeClients = new HashMap();
    }

    @Override // org.eclipse.hono.server.BaseEndpoint, org.eclipse.hono.server.Endpoint
    public boolean start() {
        registerFlowControlConsumer();
        this.linkControlAddress = getAddressForInstanceNo(TelemetryConstants.EVENT_BUS_ADDRESS_TELEMETRY_LINK_CONTROL);
        LOG.info("publishing downstream link control messages on event bus [address: {}]", this.linkControlAddress);
        this.dataAddress = getAddressForInstanceNo(TelemetryConstants.EVENT_BUS_ADDRESS_TELEMETRY_IN);
        LOG.info("publishing downstream telemetry messages on event bus [address: {}]", this.dataAddress);
        return true;
    }

    @Override // org.eclipse.hono.server.BaseEndpoint, org.eclipse.hono.server.Endpoint
    public boolean stop() {
        if (this.flowControlConsumer == null) {
            return true;
        }
        this.flowControlConsumer.unregister();
        return true;
    }

    private void registerFlowControlConsumer() {
        this.flowControlAddress = getAddressForInstanceNo(TelemetryConstants.EVENT_BUS_ADDRESS_TELEMETRY_FLOW_CONTROL);
        this.flowControlConsumer = this.vertx.eventBus().consumer(this.flowControlAddress, this::handleFlowControlMsg);
        LOG.info("listening on event bus [address: {}] for downstream flow control messages", this.flowControlAddress);
    }

    private void handleFlowControlMsg(Message<JsonObject> message) {
        if (message.body() == null) {
            LOG.warn("received empty message from telemetry adapter, is this a bug?");
            return;
        }
        if (TelemetryConstants.isFlowControlMessage((JsonObject) message.body())) {
            handleFlowControlMsg(((JsonObject) message.body()).getJsonObject(TelemetryConstants.MSG_TYPE_FLOW_CONTROL));
        } else if (TelemetryConstants.isErrorMessage((JsonObject) message.body())) {
            handleErrorMessage(((JsonObject) message.body()).getJsonObject("error"));
        } else {
            LOG.info("received unsupported message {}", ((JsonObject) message.body()).encodePrettily());
        }
    }

    private void handleFlowControlMsg(JsonObject jsonObject) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("received flow control message from telemetry adapter: {}", jsonObject.encodePrettily());
        }
        String string = jsonObject.getString(TelemetryConstants.FIELD_NAME_LINK_ID);
        LinkWrapper linkWrapper = this.activeClients.get(string);
        if (linkWrapper != null) {
            linkWrapper.replenish(jsonObject.getInteger(TelemetryConstants.FIELD_NAME_CREDIT, 0).intValue());
        } else {
            LOG.warn("discarding flow control message for non-existing link {}", string);
        }
    }

    private void handleErrorMessage(JsonObject jsonObject) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("received error message from telemetry adapter: {}", jsonObject.encodePrettily());
        }
        LinkWrapper linkWrapper = this.activeClients.get(jsonObject.getString(TelemetryConstants.FIELD_NAME_LINK_ID));
        if (linkWrapper == null || !jsonObject.getBoolean(TelemetryConstants.FIELD_NAME_CLOSE_LINK, false).booleanValue()) {
            return;
        }
        onLinkDetach(linkWrapper);
    }

    @Override // org.eclipse.hono.server.Endpoint
    public String getName() {
        return TelemetryConstants.TELEMETRY_ENDPOINT;
    }

    @Override // org.eclipse.hono.server.Endpoint
    public void onLinkAttach(ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier) {
        if (ProtonQoS.AT_LEAST_ONCE.equals(protonReceiver.getRemoteQoS())) {
            LOG.debug("client wants to use AT LEAST ONCE delivery mode, ignoring ...");
        }
        String uuid = UUID.randomUUID().toString();
        LinkWrapper linkWrapper = new LinkWrapper(uuid, protonReceiver);
        protonReceiver.closeHandler(asyncResult -> {
            onLinkDetach(linkWrapper);
        }).handler((protonDelivery, message) -> {
            if (TelemetryMessageFilter.verify(resourceIdentifier, message)) {
                sendTelemetryData(linkWrapper, protonDelivery, message);
            } else {
                MessageHelper.rejected(protonDelivery, AmqpError.DECODE_ERROR.toString(), "malformed telemetry message");
                onLinkDetach(linkWrapper, ProtonHelper.condition(AmqpError.DECODE_ERROR.toString(), "invalid message received"));
            }
        }).open();
        LOG.debug("registering new link for telemetry client [{}]", uuid);
        this.activeClients.put(uuid, linkWrapper);
        sendLinkAttachMessage(linkWrapper, resourceIdentifier);
    }

    private void sendLinkAttachMessage(LinkWrapper linkWrapper, ResourceIdentifier resourceIdentifier) {
        this.vertx.eventBus().send(this.linkControlAddress, TelemetryConstants.getLinkAttachedMsg(linkWrapper.getConnectionId(), linkWrapper.getLinkId(), resourceIdentifier), TelemetryConstants.addReplyToHeader(new DeliveryOptions(), this.flowControlAddress));
    }

    private void onLinkDetach(LinkWrapper linkWrapper) {
        onLinkDetach(linkWrapper, null);
    }

    private void onLinkDetach(LinkWrapper linkWrapper, ErrorCondition errorCondition) {
        LOG.debug("closing receiver for client [{}]", linkWrapper.getLinkId());
        this.activeClients.remove(linkWrapper.getLinkId());
        sendLinkDetachMessage(linkWrapper);
        linkWrapper.close(errorCondition);
    }

    private void sendLinkDetachMessage(LinkWrapper linkWrapper) {
        this.vertx.eventBus().send(this.linkControlAddress, TelemetryConstants.getLinkDetachedMsg(linkWrapper.getLinkId()), TelemetryConstants.addReplyToHeader(new DeliveryOptions(), this.flowControlAddress));
    }

    private void sendTelemetryData(LinkWrapper linkWrapper, ProtonDelivery protonDelivery, org.apache.qpid.proton.message.Message message) {
        if (!protonDelivery.remotelySettled()) {
            LOG.trace("received un-settled telemetry message on link [{}]", linkWrapper.getLinkId());
        }
        ResourceIdentifier fromString = ResourceIdentifier.fromString((String) MessageHelper.getAnnotation(message, AuthorizationConstants.RESOURCE_FIELD, String.class));
        checkDeviceExists(fromString, bool -> {
            if (!bool.booleanValue()) {
                LOG.debug("device {}/{} does not exist, closing link", fromString.getTenantId(), fromString.getResourceId());
                MessageHelper.rejected(protonDelivery, AmqpError.PRECONDITION_FAILED.toString(), "device does not exist");
                onLinkDetach(linkWrapper, ProtonHelper.condition(AmqpError.PRECONDITION_FAILED.toString(), "device does not exist"));
            } else {
                String uuid = UUID.randomUUID().toString();
                this.vertx.sharedData().getLocalMap(this.dataAddress).put(uuid, AmqpMessage.of(message, protonDelivery));
                sendAtMostOnce(linkWrapper, uuid, protonDelivery);
            }
        });
    }

    private void checkDeviceExists(ResourceIdentifier resourceIdentifier, Handler<Boolean> handler) {
        this.vertx.eventBus().send("registration.in", RegistrationConstants.getRegistrationJson("get", resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId()), asyncResult -> {
            if (!asyncResult.succeeded()) {
                handler.handle(false);
            } else {
                handler.handle(Boolean.valueOf(String.valueOf(200).equals(((JsonObject) ((Message) asyncResult.result()).body()).getString("status"))));
            }
        });
    }

    private void sendAtMostOnce(LinkWrapper linkWrapper, String str, ProtonDelivery protonDelivery) {
        this.vertx.eventBus().send(this.dataAddress, TelemetryConstants.getTelemetryMsg(str, linkWrapper.getLinkId()));
        ProtonHelper.accepted(protonDelivery, true);
        LOG.trace("publishing telemetry msg received via Link[id: {}, credit left: {}] to {}", new Object[]{linkWrapper.getLinkId(), Integer.valueOf(linkWrapper.getCredit()), this.dataAddress});
    }
}
