package org.eclipse.hono.impl;

import io.vertx.core.Handler;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/impl/ProtonSenderWriteStream.class */
public class ProtonSenderWriteStream implements WriteStream<Message> {
    private static final Logger LOG = LoggerFactory.getLogger(ProtonSenderWriteStream.class);
    private ProtonSender sender;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> sentHandler;
    private AtomicInteger count = new AtomicInteger(1);

    public ProtonSenderWriteStream(ProtonSender protonSender, Handler<Void> handler) {
        this.sender = (ProtonSender) Objects.requireNonNull(protonSender);
        this.sentHandler = handler;
    }

    public WriteStream<Message> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public WriteStream<Message> write(Message message) {
        sendMessage(message);
        return this;
    }

    public void end() {
    }

    public WriteStream<Message> setWriteQueueMaxSize(int i) {
        return this;
    }

    public boolean writeQueueFull() {
        return this.sender.sendQueueFull();
    }

    public WriteStream<Message> drainHandler(Handler<Void> handler) {
        LOG.trace("registering drain handler");
        this.sender.sendQueueDrainHandler(protonSender -> {
            handler.handle((Object) null);
        });
        return this;
    }

    private void sendMessage(Message message) {
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(this.count.getAndIncrement());
        allocate.flip();
        LOG.trace("sending message [id: {}] to peer", message.getMessageId());
        if (!ProtonQoS.AT_MOST_ONCE.equals(this.sender.getQoS())) {
            this.sender.send(allocate.array(), message, protonDelivery -> {
                if (!Accepted.class.isInstance(protonDelivery.getRemoteState())) {
                    LOG.warn("message [id: {}] has not been accepted by peer: {}", message.getMessageId(), protonDelivery.getRemoteState());
                    return;
                }
                LOG.trace("message [id: {}, remotelySettled: {}] has been accepted by peer", message.getMessageId(), Boolean.valueOf(protonDelivery.remotelySettled()));
                protonDelivery.settle();
                if (this.sentHandler != null) {
                    this.sentHandler.handle((Object) null);
                }
            });
        } else {
            this.sender.send(allocate.array(), message);
            this.sentHandler.handle((Object) null);
        }
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
