package org.eclipse.hono.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonReceiver;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/impl/EventConsumerImpl.class */
public class EventConsumerImpl extends AbstractHonoClient implements MessageConsumer {
    private static final String EVENT_ADDRESS_TEMPLATE = "event%s%s";
    private static final Logger LOG = LoggerFactory.getLogger(EventConsumerImpl.class);

    private EventConsumerImpl(Context context, ProtonReceiver protonReceiver) {
        super(context);
        this.receiver = protonReceiver;
    }

    public static void create(Context context, ProtonConnection protonConnection, String str, String str2, Consumer<Message> consumer, Handler<AsyncResult<MessageConsumer>> handler) {
        Objects.requireNonNull(protonConnection);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        createConsumer(context, protonConnection, str, str2, consumer).setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                handler.handle(Future.succeededFuture(new EventConsumerImpl(context, (ProtonReceiver) asyncResult.result())));
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
    }

    private static Future<ProtonReceiver> createConsumer(Context context, ProtonConnection protonConnection, String str, String str2, Consumer<Message> consumer) {
        Future<ProtonReceiver> future = Future.future();
        String format = String.format(EVENT_ADDRESS_TEMPLATE, str2, str);
        context.runOnContext(r7 -> {
            ProtonReceiver createReceiver = protonConnection.createReceiver(format);
            createReceiver.setAutoAccept(true).setPrefetch(20);
            createReceiver.openHandler(asyncResult -> {
                if (!asyncResult.succeeded()) {
                    future.fail(asyncResult.cause());
                } else {
                    LOG.debug("event receiver for [{}] open", ((ProtonReceiver) asyncResult.result()).getRemoteSource());
                    future.complete(asyncResult.result());
                }
            });
            createReceiver.handler((protonDelivery, message) -> {
                if (consumer != null) {
                    consumer.accept(message);
                }
            });
            createReceiver.open();
        });
        return future;
    }

    @Override // org.eclipse.hono.client.MessageConsumer
    public void close(Handler<AsyncResult<Void>> handler) {
        closeLinks(handler);
    }
}
