package org.eclipse.hono.messaging;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.service.amqp.UpstreamReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

@Profile({"standalone"})
@Service
/* loaded from: input_file:org/eclipse/hono/messaging/MessageDiscardingDownstreamAdapter.class */
public final class MessageDiscardingDownstreamAdapter implements DownstreamAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(MessageDiscardingDownstreamAdapter.class);
    private static final int DEFAULT_CREDIT = 10;
    private final Vertx vertx;
    private final int pauseThreshold;
    private final long pausePeriod;
    private Map<String, LinkStatus> statusMap;
    private Consumer<Message> messageConsumer;

    /* loaded from: input_file:org/eclipse/hono/messaging/MessageDiscardingDownstreamAdapter$LinkStatus.class */
    private class LinkStatus {
        private long msgCount;
        private UpstreamReceiver client;
        private boolean suspended;

        public LinkStatus(UpstreamReceiver upstreamReceiver) {
            this.client = upstreamReceiver;
        }

        public void onMsgReceived() {
            this.msgCount++;
            if (MessageDiscardingDownstreamAdapter.this.pauseThreshold > 0) {
                if (this.msgCount % MessageDiscardingDownstreamAdapter.this.pauseThreshold == 0) {
                    pause();
                }
            } else if (this.msgCount % 10 == 0) {
                this.client.replenish(MessageDiscardingDownstreamAdapter.DEFAULT_CREDIT);
            }
        }

        public void pause() {
            MessageDiscardingDownstreamAdapter.LOG.debug("pausing link [{}]", this.client.getLinkId());
            this.suspended = true;
            MessageDiscardingDownstreamAdapter.this.vertx.setTimer(MessageDiscardingDownstreamAdapter.this.pausePeriod, l -> {
                MessageDiscardingDownstreamAdapter.this.vertx.runOnContext(r3 -> {
                    resume();
                });
            });
        }

        private void resume() {
            if (this.suspended) {
                MessageDiscardingDownstreamAdapter.LOG.debug("resuming link [{}]", this.client.getLinkId());
                int i = MessageDiscardingDownstreamAdapter.DEFAULT_CREDIT;
                if (MessageDiscardingDownstreamAdapter.this.pauseThreshold > 0) {
                    i = MessageDiscardingDownstreamAdapter.this.pauseThreshold;
                }
                this.client.replenish(i);
                this.suspended = false;
            }
        }
    }

    public MessageDiscardingDownstreamAdapter(Vertx vertx) {
        this(vertx, null);
    }

    public MessageDiscardingDownstreamAdapter(Vertx vertx, Consumer<Message> consumer) {
        this(vertx, 0, 0L, consumer);
    }

    public MessageDiscardingDownstreamAdapter(Vertx vertx, int i, long j, Consumer<Message> consumer) {
        this.statusMap = new HashMap();
        this.vertx = vertx;
        this.pauseThreshold = i;
        this.pausePeriod = j;
        this.messageConsumer = consumer;
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public void start(Future<Void> future) {
        future.complete();
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public void stop(Future<Void> future) {
        future.complete();
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public void onClientAttach(UpstreamReceiver upstreamReceiver, Handler<AsyncResult<Void>> handler) {
        upstreamReceiver.replenish(DEFAULT_CREDIT);
        handler.handle(Future.succeededFuture());
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public void onClientDetach(UpstreamReceiver upstreamReceiver) {
    }

    public void setMessageConsumer(Consumer<Message> consumer) {
        this.messageConsumer = consumer;
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public void onClientDisconnect(String str) {
    }

    @Override // org.eclipse.hono.messaging.DownstreamAdapter
    public void processMessage(UpstreamReceiver upstreamReceiver, ProtonDelivery protonDelivery, Message message) {
        LinkStatus linkStatus = this.statusMap.get(upstreamReceiver.getLinkId());
        if (linkStatus == null) {
            LOG.debug("creating new link status object [{}]", upstreamReceiver.getLinkId());
            linkStatus = new LinkStatus(upstreamReceiver);
            this.statusMap.put(upstreamReceiver.getLinkId(), linkStatus);
        }
        LOG.debug("processing telemetry data [id: {}, to: {}, content-type: {}]", new Object[]{message.getMessageId(), message.getAddress(), message.getContentType()});
        if (this.messageConsumer != null) {
            this.messageConsumer.accept(message);
        }
        ProtonHelper.accepted(protonDelivery, true);
        linkStatus.onMsgReceived();
    }
}
