package org.eclipse.hono.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.util.MessageHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/impl/AbstractSender.class */
abstract class AbstractSender extends AbstractHonoClient implements MessageSender {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSender.class);
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong();
    private static final Pattern CHARSET_PATTERN = Pattern.compile("^.*;charset=(.*)$");
    private static final BiConsumer<Object, ProtonDelivery> DEFAULT_DISPOSITION_HANDLER = (obj, protonDelivery) -> {
        LOG.trace("delivery state updated [message ID: {}, new remote state: {}]", obj, protonDelivery.getRemoteState());
    };
    protected final String tenantId;
    protected final String targetAddress;
    private final Handler<String> closeHook;
    private Handler<Void> drainHandler;
    private BiConsumer<Object, ProtonDelivery> defaultDispositionHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractSender(ProtonSender protonSender, String str, String str2, Context context, Handler<String> handler) {
        super(context);
        this.defaultDispositionHandler = DEFAULT_DISPOSITION_HANDLER;
        this.sender = (ProtonSender) Objects.requireNonNull(protonSender);
        this.tenantId = (String) Objects.requireNonNull(str);
        this.targetAddress = str2;
        this.closeHook = handler;
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final int getCredit() {
        if (this.sender == null) {
            return 0;
        }
        return this.sender.getCredit();
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean sendQueueFull() {
        return this.sender.sendQueueFull();
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void sendQueueDrainHandler(Handler<Void> handler) {
        if (this.drainHandler != null) {
            throw new IllegalStateException("already waiting for replenishment with credit");
        }
        this.drainHandler = (Handler) Objects.requireNonNull(handler);
        this.sender.sendQueueDrainHandler(protonSender -> {
            LOG.trace("sender has received FLOW [credits: {}, queued:{}]", Integer.valueOf(protonSender.getCredit()), Integer.valueOf(protonSender.getQueued()));
            Handler<Void> handler2 = this.drainHandler;
            this.drainHandler = null;
            if (handler2 != null) {
                handler2.handle((Object) null);
            }
        });
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void close(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        LOG.info("closing sender ...");
        closeLinks(handler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean isOpen() {
        return this.sender.isOpen();
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void setErrorHandler(Handler<AsyncResult<Void>> handler) {
        this.sender.closeHandler(asyncResult -> {
            if (asyncResult.failed()) {
                LOG.debug("server closed link with error condition: {}", asyncResult.cause().getMessage());
                this.sender.close();
                if (this.closeHook != null) {
                    this.closeHook.handle(this.targetAddress);
                }
                handler.handle(Future.failedFuture(asyncResult.cause()));
                return;
            }
            LOG.debug("server closed link");
            this.sender.close();
            if (this.closeHook != null) {
                this.closeHook.handle(this.targetAddress);
            }
        });
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void setDefaultDispositionHandler(BiConsumer<Object, ProtonDelivery> biConsumer) {
        this.defaultDispositionHandler = biConsumer;
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void send(Message message, Handler<Void> handler, BiConsumer<Object, ProtonDelivery> biConsumer) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(biConsumer);
        if (handler == null) {
            this.context.runOnContext(r7 -> {
                sendMessage(message, biConsumer);
            });
        } else {
            if (this.drainHandler != null) {
                throw new IllegalStateException("cannot send message while waiting for replenishment with credit");
            }
            if (!this.sender.isOpen()) {
                throw new IllegalStateException("sender is not open");
            }
            this.context.runOnContext(r8 -> {
                sendMessage(message, biConsumer);
                if (this.sender.sendQueueFull()) {
                    sendQueueDrainHandler(handler);
                } else {
                    handler.handle((Object) null);
                }
            });
        }
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(Message message, BiConsumer<Object, ProtonDelivery> biConsumer) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(biConsumer);
        if (this.sender.sendQueueFull()) {
            return false;
        }
        this.context.runOnContext(r7 -> {
            sendMessage(message, biConsumer);
        });
        return true;
    }

    private void sendMessage(Message message, BiConsumer<Object, ProtonDelivery> biConsumer) {
        this.sender.send(message, protonDelivery -> {
            biConsumer.accept(message.getMessageId(), protonDelivery);
        });
        LOG.trace("sent message, remaining credit: {}, queued messages: {}", Integer.valueOf(this.sender.getCredit()), Integer.valueOf(this.sender.getQueued()));
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void send(Message message, Handler<Void> handler) {
        send(message, handler, this.defaultDispositionHandler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(Message message) {
        return send(message, this.defaultDispositionHandler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(String str, byte[] bArr, String str2, String str3) {
        return send(str, (Map<String, ?>) null, bArr, str2, str3);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(String str, byte[] bArr, String str2, String str3, BiConsumer<Object, ProtonDelivery> biConsumer) {
        return send(str, (Map<String, ?>) null, bArr, str2, str3, biConsumer);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void send(String str, byte[] bArr, String str2, String str3, Handler<Void> handler) {
        send(str, (Map<String, ?>) null, bArr, str2, str3, handler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(String str, String str2, String str3, String str4) {
        return send(str, (Map<String, ?>) null, str2, str3, str4);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(String str, String str2, String str3, String str4, BiConsumer<Object, ProtonDelivery> biConsumer) {
        return send(str, (Map<String, ?>) null, str2, str3, str4, biConsumer);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void send(String str, String str2, String str3, String str4, Handler<Void> handler) {
        send(str, (Map<String, ?>) null, str2, str3, str4, handler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(String str, Map<String, ?> map, String str2, String str3, String str4) {
        Objects.requireNonNull(str2);
        return send(str, map, str2.getBytes(getCharsetForContentType((String) Objects.requireNonNull(str3))), str3, str4);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(String str, Map<String, ?> map, String str2, String str3, String str4, BiConsumer<Object, ProtonDelivery> biConsumer) {
        Objects.requireNonNull(str2);
        return send(str, map, str2.getBytes(getCharsetForContentType((String) Objects.requireNonNull(str3))), str3, str4, biConsumer);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(String str, Map<String, ?> map, byte[] bArr, String str2, String str3) {
        return send(str, map, bArr, str2, str3, this.defaultDispositionHandler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final boolean send(String str, Map<String, ?> map, byte[] bArr, String str2, String str3, BiConsumer<Object, ProtonDelivery> biConsumer) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(bArr);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(biConsumer);
        Message message = ProtonHelper.message();
        message.setBody(new Data(new Binary(bArr)));
        setApplicationProperties(message, map);
        addProperties(message, str, str2, str3);
        return send(message, biConsumer);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void send(String str, Map<String, ?> map, String str2, String str3, String str4, Handler<Void> handler) {
        send(str, map, str2, str3, str4, handler, this.defaultDispositionHandler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public final void send(String str, Map<String, ?> map, byte[] bArr, String str2, String str3, Handler<Void> handler) {
        send(str, map, bArr, str2, str3, handler, this.defaultDispositionHandler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public void send(String str, Map<String, ?> map, String str2, String str3, String str4, Handler<Void> handler, BiConsumer<Object, ProtonDelivery> biConsumer) {
        Objects.requireNonNull(str2);
        send(str, map, str2.getBytes(getCharsetForContentType((String) Objects.requireNonNull(str3))), str3, str4, handler, biConsumer);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public void send(String str, Map<String, ?> map, byte[] bArr, String str2, String str3, Handler<Void> handler, BiConsumer<Object, ProtonDelivery> biConsumer) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(bArr);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Message message = ProtonHelper.message();
        message.setAddress(getTo(str));
        message.setBody(new Data(new Binary(bArr)));
        setApplicationProperties(message, map);
        addProperties(message, str, str2, str3);
        addEndpointSpecificProperties(message, str);
        send(message, handler, biConsumer);
    }

    protected abstract String getTo(String str);

    private void addProperties(Message message, String str, String str2, String str3) {
        message.setMessageId(String.format("%s-%d", getClass().getSimpleName(), Long.valueOf(MESSAGE_COUNTER.getAndIncrement())));
        message.setContentType(str2);
        MessageHelper.addDeviceId(message, str);
        MessageHelper.addRegistrationAssertion(message, str3);
    }

    protected void addEndpointSpecificProperties(Message message, String str) {
    }

    private Charset getCharsetForContentType(String str) {
        Matcher matcher = CHARSET_PATTERN.matcher(str);
        return matcher.matches() ? Charset.forName(matcher.group(1)) : StandardCharsets.UTF_8;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static final Future<ProtonSender> createSender(Context context, ProtonConnection protonConnection, String str, ProtonQoS protonQoS, Handler<String> handler) {
        Future<ProtonSender> future = Future.future();
        context.runOnContext(r10 -> {
            ProtonSender createSender = protonConnection.createSender(str);
            createSender.setQoS(protonQoS);
            createSender.openHandler(asyncResult -> {
                if (asyncResult.succeeded()) {
                    LOG.debug("sender open [{}]", createSender.getRemoteTarget());
                    future.complete(asyncResult.result());
                } else {
                    LOG.debug("opening sender [{}] failed: {}", str, asyncResult.cause().getMessage());
                    future.fail(asyncResult.cause());
                }
            }).closeHandler(asyncResult2 -> {
                if (asyncResult2.succeeded()) {
                    LOG.debug("sender [{}] closed", str);
                } else {
                    LOG.debug("sender [{}] closed: {}", str, asyncResult2.cause().getMessage());
                }
                createSender.close();
                if (handler != null) {
                    handler.handle(str);
                }
            }).open();
        });
        return future;
    }
}
