package org.eclipse.hono.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.RequestResponseClient;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RequestResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/impl/AbstractRequestResponseClient.class */
public abstract class AbstractRequestResponseClient<C extends RequestResponseClient, R extends RequestResponseResult<?>> extends AbstractHonoClient implements RequestResponseClient {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestResponseClient.class);
    protected final Map<String, Handler<AsyncResult<R>>> replyMap;
    protected final String replyToAddress;
    private final String requestResponseAddressTemplate;
    private final String requestResponseReplyToAddressTemplate;

    protected abstract String getName();

    protected abstract String createMessageId();

    protected abstract R getResult(int i, String str);

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRequestResponseClient(Context context, ProtonConnection protonConnection, String str, Handler<AsyncResult<C>> handler) {
        super(context);
        this.replyMap = new ConcurrentHashMap();
        this.requestResponseAddressTemplate = String.format("%s/%%s", getName());
        this.requestResponseReplyToAddressTemplate = String.format("%s/%%s/%%s", getName());
        this.replyToAddress = String.format(this.requestResponseReplyToAddressTemplate, Objects.requireNonNull(str), UUID.randomUUID());
        Future future = Future.future();
        future.setHandler(asyncResult -> {
            if (!asyncResult.succeeded()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
                return;
            }
            LOG.debug("request response client created");
            this.sender = (ProtonSender) asyncResult.result();
            handler.handle(Future.succeededFuture(this));
        });
        Future future2 = Future.future();
        context.runOnContext(r11 -> {
            protonConnection.createReceiver(this.replyToAddress).setAutoAccept(true).setPrefetch(1000).handler((protonDelivery, message) -> {
                Handler<AsyncResult<R>> remove = this.replyMap.remove(message.getCorrelationId());
                if (remove == null) {
                    LOG.debug("discarding unexpected response [correlation ID: {}]", message.getCorrelationId());
                    return;
                }
                R requestResponseResult = getRequestResponseResult(message);
                LOG.debug("received response [correlation ID: {}, status: {}]", message.getCorrelationId(), Integer.valueOf(requestResponseResult.getStatus()));
                remove.handle(Future.succeededFuture(requestResponseResult));
            }).openHandler(future2.completer()).open();
            future2.compose(protonReceiver -> {
                this.receiver = protonReceiver;
                protonConnection.createSender(String.format(this.requestResponseAddressTemplate, str)).setQoS(ProtonQoS.AT_LEAST_ONCE).openHandler(future.completer()).open();
            }, future);
        });
    }

    protected Message createMessage(String str, Map<String, Object> map) {
        Message message = ProtonHelper.message();
        String createMessageId = createMessageId();
        setApplicationProperties(message, map);
        message.setReplyTo(this.replyToAddress);
        message.setMessageId(createMessageId);
        message.setSubject(str);
        return message;
    }

    private R getRequestResponseResult(Message message) {
        String str = (String) MessageHelper.getApplicationProperty(message.getApplicationProperties(), "status", String.class);
        return getResult(Integer.valueOf(str).intValue(), MessageHelper.getPayload(message));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void createAndSendRequest(String str, JsonObject jsonObject, Handler<AsyncResult<R>> handler) {
        createAndSendRequest(str, null, jsonObject, handler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void createAndSendRequest(String str, Map<String, Object> map, JsonObject jsonObject, Handler<AsyncResult<R>> handler) {
        Message createMessage = createMessage(str, map);
        if (jsonObject != null) {
            createMessage.setContentType("application/json; charset=utf-8");
            createMessage.setBody(new AmqpValue(jsonObject.encode()));
        }
        sendMessage(createMessage, handler);
    }

    protected final void sendMessage(Message message, Handler<AsyncResult<R>> handler) {
        this.context.runOnContext(r7 -> {
            this.replyMap.put((String) message.getMessageId(), handler);
            this.sender.send(message);
        });
    }

    @Override // org.eclipse.hono.client.RequestResponseClient
    public final boolean isOpen() {
        return this.sender != null && this.sender.isOpen() && this.receiver != null && this.receiver.isOpen();
    }

    @Override // org.eclipse.hono.client.RequestResponseClient
    public final void close(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        LOG.info("closing request response client ...");
        closeLinks(handler);
    }
}
