package org.eclipse.hono.telemetry.impl;

import io.vertx.core.Future;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.hono.AmqpMessage;
import org.eclipse.hono.telemetry.TelemetryAdapter;
import org.eclipse.hono.telemetry.TelemetryConstants;
import org.eclipse.hono.util.AbstractInstanceNumberAwareVerticle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/telemetry/impl/BaseTelemetryAdapter.class */
public abstract class BaseTelemetryAdapter extends AbstractInstanceNumberAwareVerticle implements TelemetryAdapter {
    protected static final int DEFAULT_CREDIT = 10;
    private static final Logger LOG = LoggerFactory.getLogger(BaseTelemetryAdapter.class);
    private final Map<String, String> flowControlAddressRegistry;
    private MessageConsumer<JsonObject> telemetryDataConsumer;
    private MessageConsumer<JsonObject> linkControlConsumer;
    private MessageConsumer<String> connectionClosedListener;
    private String dataAddress;

    protected BaseTelemetryAdapter() {
        this(0, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseTelemetryAdapter(int i, int i2) {
        super(i, i2);
        this.flowControlAddressRegistry = new HashMap();
    }

    public final void start(Future<Void> future) throws Exception {
        registerLinkControlConsumer();
        registerTelemetryDataConsumer();
        registerConnectionClosedListener();
        doStart(future);
    }

    protected void doStart(Future<Void> future) throws Exception {
        future.complete();
    }

    private void registerTelemetryDataConsumer() {
        this.dataAddress = getAddressWithId(TelemetryConstants.EVENT_BUS_ADDRESS_TELEMETRY_IN);
        this.telemetryDataConsumer = this.vertx.eventBus().consumer(this.dataAddress);
        this.telemetryDataConsumer.handler(this::processMessage);
        LOG.info("listening on event bus [address: {}] for downstream telemetry messages", this.dataAddress);
    }

    private void registerLinkControlConsumer() {
        String addressWithId = getAddressWithId(TelemetryConstants.EVENT_BUS_ADDRESS_TELEMETRY_LINK_CONTROL);
        this.linkControlConsumer = this.vertx.eventBus().consumer(addressWithId);
        this.linkControlConsumer.handler(this::processLinkControlMessage);
        LOG.info("listening on event bus [address: {}] for downstream link control messages", addressWithId);
    }

    private void registerConnectionClosedListener() {
        this.connectionClosedListener = this.vertx.eventBus().consumer("hono.connection.closed", this::processConnectionClosedEvent);
    }

    public final void stop(Future<Void> future) {
        this.telemetryDataConsumer.unregister();
        LOG.info("unregistered telemetry data consumer from event bus");
        this.linkControlConsumer.unregister();
        LOG.info("unregistered link control consumer from event bus");
        this.connectionClosedListener.unregister();
        LOG.info("unregistered connection close listener from event bus");
        doStop(future);
    }

    protected void doStop(Future<Void> future) {
        future.complete();
    }

    private void processConnectionClosedEvent(Message<String> message) {
        onConnectionClosed((String) message.body());
    }

    protected void onConnectionClosed(String str) {
    }

    private void processLinkControlMessage(Message<JsonObject> message) {
        JsonObject jsonObject = (JsonObject) message.body();
        String string = jsonObject.getString(TelemetryConstants.FIELD_NAME_EVENT);
        String string2 = jsonObject.getString(TelemetryConstants.FIELD_NAME_LINK_ID);
        LOG.trace("received link control msg: {}", jsonObject.encodePrettily());
        if (TelemetryConstants.EVENT_ATTACHED.equalsIgnoreCase(string)) {
            processLinkAttachedMessage(jsonObject.getString(TelemetryConstants.FIELD_NAME_CONNECTION_ID), string2, jsonObject.getString(TelemetryConstants.FIELD_NAME_TARGET_ADDRESS), message.headers().get(TelemetryConstants.HEADER_NAME_REPLY_TO));
        } else if (!TelemetryConstants.EVENT_DETACHED.equalsIgnoreCase(string)) {
            LOG.warn("discarding unsupported link control command [{}]", string);
        } else {
            onLinkDetached(string2);
            unregisterReplyToAddress(string2);
        }
    }

    final void processLinkAttachedMessage(String str, String str2, String str3, String str4) {
        if (str4 == null) {
            LOG.warn("discarding link [{}] control message lacking required header [{}]", str2, TelemetryConstants.HEADER_NAME_REPLY_TO);
        } else {
            this.flowControlAddressRegistry.put(str2, str4);
            onLinkAttached(str, str2, str3);
        }
    }

    private void unregisterReplyToAddress(String str) {
        this.flowControlAddressRegistry.remove(str);
    }

    protected void onLinkAttached(String str, String str2, String str3) {
        replenishUpstreamSender(str2, DEFAULT_CREDIT);
    }

    protected void onLinkDetached(String str) {
    }

    private void processMessage(Message<JsonObject> message) {
        JsonObject jsonObject = (JsonObject) message.body();
        String string = jsonObject.getString(TelemetryConstants.FIELD_NAME_LINK_ID);
        Object remove = this.vertx.sharedData().getLocalMap(this.dataAddress).remove(jsonObject.getString(TelemetryConstants.FIELD_NAME_MSG_UUID));
        if (remove instanceof AmqpMessage) {
            processTelemetryData(((AmqpMessage) remove).getMessage(), string);
        } else {
            LOG.warn("expected {} in shared local map {} but found {}", new Object[]{AmqpMessage.class.getName(), this.dataAddress, remove.getClass().getName()});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void replenishUpstreamSender(String str, int i) {
        sendMessage(str, TelemetryConstants.getCreditReplenishmentMsg(str, i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void sendErrorMessage(String str, boolean z) {
        sendMessage(str, TelemetryConstants.getErrorMessage(str, z));
    }

    private void sendMessage(String str, JsonObject jsonObject) {
        String str2 = this.flowControlAddressRegistry.get(str);
        if (str2 == null) {
            LOG.warn("cannot send upstream message for link [{}], no event bus address registered", str);
        } else {
            LOG.trace("sending upstream message for link [{}] to address [{}]: {}", new Object[]{str, str2, jsonObject.encodePrettily()});
            this.vertx.eventBus().send(str2, jsonObject);
        }
    }
}
