package org.eclipse.hono.telemetry.impl;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

@Profile({"standalone"})
@Service
/* loaded from: input_file:org/eclipse/hono/telemetry/impl/MessageDiscardingTelemetryAdapter.class */
public final class MessageDiscardingTelemetryAdapter extends BaseTelemetryAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(MessageDiscardingTelemetryAdapter.class);
    private final int pauseThreshold;
    private final long pausePeriod;
    private Map<String, LinkStatus> statusMap;
    private Consumer<Message> messageConsumer;

    /* loaded from: input_file:org/eclipse/hono/telemetry/impl/MessageDiscardingTelemetryAdapter$LinkStatus.class */
    private class LinkStatus {
        private long msgCount;
        private String linkId;
        private boolean suspended;

        public LinkStatus(String str) {
            this.linkId = str;
        }

        public void onMsgReceived() {
            this.msgCount++;
            if (MessageDiscardingTelemetryAdapter.this.pauseThreshold > 0) {
                if (this.msgCount % MessageDiscardingTelemetryAdapter.this.pauseThreshold == 0) {
                    pause();
                }
            } else if (this.msgCount % 10 == 0) {
                MessageDiscardingTelemetryAdapter.this.replenishUpstreamSender(this.linkId, 10);
            }
        }

        public void pause() {
            MessageDiscardingTelemetryAdapter.LOG.debug("pausing link [{}]", this.linkId);
            this.suspended = true;
            MessageDiscardingTelemetryAdapter.this.vertx.setTimer(MessageDiscardingTelemetryAdapter.this.pausePeriod, l -> {
                MessageDiscardingTelemetryAdapter.this.vertx.runOnContext(r3 -> {
                    resume();
                });
            });
        }

        private void resume() {
            if (this.suspended) {
                MessageDiscardingTelemetryAdapter.LOG.debug("resuming link [{}]", this.linkId);
                int i = 10;
                if (MessageDiscardingTelemetryAdapter.this.pauseThreshold > 0) {
                    i = MessageDiscardingTelemetryAdapter.this.pauseThreshold;
                }
                MessageDiscardingTelemetryAdapter.this.replenishUpstreamSender(this.linkId, i);
                this.suspended = false;
            }
        }
    }

    public MessageDiscardingTelemetryAdapter() {
        this(0, 0L, null);
    }

    public MessageDiscardingTelemetryAdapter(Consumer<Message> consumer) {
        this(0, 0L, consumer);
    }

    public MessageDiscardingTelemetryAdapter(int i, long j, Consumer<Message> consumer) {
        super(0, 1);
        this.statusMap = new HashMap();
        this.pauseThreshold = i;
        this.pausePeriod = j;
        this.messageConsumer = consumer;
    }

    public void setMessageConsumer(Consumer<Message> consumer) {
        this.messageConsumer = consumer;
    }

    @Override // org.eclipse.hono.telemetry.TelemetryAdapter
    public void processTelemetryData(Message message, String str) {
        LinkStatus linkStatus = this.statusMap.get(str);
        if (linkStatus == null) {
            LOG.debug("creating new link status object [{}]", str);
            linkStatus = new LinkStatus(str);
            this.statusMap.put(str, linkStatus);
        }
        LOG.debug("processing telemetry data [id: {}, to: {}, content-type: {}]", new Object[]{message.getMessageId(), message.getAddress(), message.getContentType()});
        if (this.messageConsumer != null) {
            this.messageConsumer.accept(message);
        }
        linkStatus.onMsgReceived();
    }
}
