package io.vertx.kafka.client.consumer.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

/* loaded from: input_file:io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.class */
public class KafkaReadStreamImpl<K, V> implements KafkaReadStream<K, V> {
    private static final AtomicInteger threadCount = new AtomicInteger(0);
    private final Context context;
    private final Consumer<K, V> consumer;
    private final ConsumerTracer tracer;
    private Handler<ConsumerRecord<K, V>> recordHandler;
    private Handler<Throwable> exceptionHandler;
    private Iterator<ConsumerRecord<K, V>> current;
    private Handler<ConsumerRecords<K, V>> batchHandler;
    private Handler<Set<TopicPartition>> partitionsRevokedHandler;
    private Handler<Set<TopicPartition>> partitionsAssignedHandler;
    private ExecutorService worker;
    private final AtomicBoolean closed = new AtomicBoolean(true);
    private final AtomicBoolean consuming = new AtomicBoolean(false);
    private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE);
    private final AtomicBoolean polling = new AtomicBoolean(false);
    private Duration pollTimeout = Duration.ofSeconds(1);
    private final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { // from class: io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.1
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            Handler handler = KafkaReadStreamImpl.this.partitionsRevokedHandler;
            if (handler != null) {
                KafkaReadStreamImpl.this.context.runOnContext(r5 -> {
                    handler.handle(Helper.toSet(collection));
                });
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            Handler handler = KafkaReadStreamImpl.this.partitionsAssignedHandler;
            if (handler != null) {
                KafkaReadStreamImpl.this.context.runOnContext(r5 -> {
                    handler.handle(Helper.toSet(collection));
                });
            }
        }
    };

    public KafkaReadStreamImpl(Vertx vertx, Consumer<K, V> consumer, KafkaClientOptions kafkaClientOptions) {
        ContextInternal unwrap = vertx.getOrCreateContext().unwrap();
        this.consumer = consumer;
        this.context = unwrap;
        this.tracer = ConsumerTracer.create(unwrap.tracer(), kafkaClientOptions);
    }

    private <T> void start(BiConsumer<Consumer<K, V>, Promise<T>> biConsumer, Handler<AsyncResult<T>> handler) {
        this.worker = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "vert.x-kafka-consumer-thread-" + threadCount.getAndIncrement());
        });
        submitTaskWhenStarted(biConsumer, handler);
    }

    private <T> void submitTaskWhenStarted(BiConsumer<Consumer<K, V>, Promise<T>> biConsumer, Handler<AsyncResult<T>> handler) {
        if (this.worker == null) {
            throw new IllegalStateException();
        }
        this.worker.submit(() -> {
            Promise promise = null;
            if (handler != null) {
                promise = Promise.promise();
                promise.future().onComplete(asyncResult -> {
                    this.context.runOnContext(r5 -> {
                        handler.handle(asyncResult);
                    });
                });
            }
            try {
                biConsumer.accept(this.consumer, promise);
            } catch (Exception e) {
                if (promise != null) {
                    promise.tryFail(e);
                }
                if (this.exceptionHandler != null) {
                    this.exceptionHandler.handle(e);
                }
            }
        });
    }

    private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
        if (this.polling.compareAndSet(false, true)) {
            this.worker.submit(() -> {
                boolean z = false;
                try {
                    if (!this.closed.get()) {
                        try {
                            ConsumerRecords poll = this.consumer.poll(this.pollTimeout);
                            if (poll != null && poll.count() > 0) {
                                z = true;
                                this.context.runOnContext(r6 -> {
                                    this.polling.set(false);
                                    handler.handle(poll);
                                });
                            }
                        } catch (Exception e) {
                            if (this.exceptionHandler != null) {
                                this.exceptionHandler.handle(e);
                            }
                        } catch (WakeupException e2) {
                        }
                    }
                } finally {
                    if (!z) {
                        this.context.runOnContext(r5 -> {
                            this.polling.set(false);
                            schedule(0L);
                        });
                    }
                }
            });
        }
    }

    private void schedule(long j) {
        Handler<ConsumerRecord<K, V>> handler = this.recordHandler;
        Handler<ConsumerRecords<K, V>> handler2 = this.batchHandler;
        if (!this.consuming.get() || this.demand.get() <= 0) {
            return;
        }
        if (handler == null && this.batchHandler == null) {
            return;
        }
        this.context.runOnContext(r12 -> {
            if (j > 0) {
                this.context.owner().setTimer(j, l -> {
                    run(handler, handler2);
                });
            } else {
                run(handler, handler2);
            }
        });
    }

    private void run(Handler<ConsumerRecord<K, V>> handler, Handler<ConsumerRecords<K, V>> handler2) {
        long j;
        if (this.closed.get()) {
            return;
        }
        if (this.current == null || !this.current.hasNext()) {
            pollRecords(consumerRecords -> {
                if (consumerRecords == null || consumerRecords.count() <= 0) {
                    schedule(1L);
                    return;
                }
                this.current = consumerRecords.iterator();
                if (handler2 != null) {
                    handler2.handle(consumerRecords);
                }
                schedule(0L);
            });
            return;
        }
        int i = 0;
        loop0: while (this.current.hasNext()) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            }
            do {
                j = this.demand.get();
                if (j <= 0) {
                    break loop0;
                }
                if (j != Long.MAX_VALUE) {
                }
                ConsumerRecord<K, V> next = this.current.next();
                ContextInternal duplicate = this.context.duplicate();
                duplicate.emit(r8 -> {
                    tracedHandler(duplicate, handler).handle(next);
                });
            } while (!this.demand.compareAndSet(j, j - 1));
            ConsumerRecord next2 = this.current.next();
            ContextInternal duplicate2 = this.context.duplicate();
            duplicate2.emit(r82 -> {
                tracedHandler(duplicate2, handler).handle(next2);
            });
        }
        schedule(0L);
    }

    private Handler<ConsumerRecord<K, V>> tracedHandler(Context context, Handler<ConsumerRecord<K, V>> handler) {
        return this.tracer == null ? handler : consumerRecord -> {
            ConsumerTracer<S>.StartedSpan prepareMessageReceived = this.tracer.prepareMessageReceived(context, consumerRecord);
            try {
                handler.handle(consumerRecord);
                prepareMessageReceived.finish(context);
            } catch (Throwable th) {
                prepareMessageReceived.fail(context, th);
                throw th;
            }
        };
    }

    protected <T> void submitTask(BiConsumer<Consumer<K, V>, Promise<T>> biConsumer, Handler<AsyncResult<T>> handler) {
        if (this.closed.compareAndSet(true, false)) {
            start(biConsumer, handler);
        } else {
            submitTaskWhenStarted(biConsumer, handler);
        }
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> pause(Set<TopicPartition> set) {
        Promise promise = Promise.promise();
        pause(set, promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> pause(Set<TopicPartition> set, Handler<AsyncResult<Void>> handler) {
        submitTask((consumer, promise) -> {
            consumer.pause(set);
            if (promise != null) {
                promise.complete();
            }
        }, handler);
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void paused(Handler<AsyncResult<Set<TopicPartition>>> handler) {
        submitTask((consumer, promise) -> {
            Set paused = consumer.paused();
            if (promise != null) {
                promise.complete(paused);
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Set<TopicPartition>> paused() {
        Promise promise = Promise.promise();
        paused(promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> resume(Set<TopicPartition> set) {
        Promise promise = Promise.promise();
        resume(set, promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> resume(Set<TopicPartition> set, Handler<AsyncResult<Void>> handler) {
        submitTask((consumer, promise) -> {
            consumer.resume(set);
            if (promise != null) {
                promise.complete();
            }
        }, handler);
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void committed(TopicPartition topicPartition, Handler<AsyncResult<OffsetAndMetadata>> handler) {
        submitTask((consumer, promise) -> {
            OffsetAndMetadata committed = consumer.committed(topicPartition);
            if (promise != null) {
                promise.complete(committed);
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<OffsetAndMetadata> committed(TopicPartition topicPartition) {
        Promise promise = Promise.promise();
        committed(topicPartition, promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> seekToEnd(Set<TopicPartition> set) {
        Promise promise = Promise.promise();
        seekToEnd(set, promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> seekToEnd(Set<TopicPartition> set, Handler<AsyncResult<Void>> handler) {
        this.context.runOnContext(r7 -> {
            this.current = null;
            submitTask((consumer, promise) -> {
                consumer.seekToEnd(set);
                if (promise != null) {
                    promise.complete();
                }
            }, handler);
        });
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> seekToBeginning(Set<TopicPartition> set) {
        Promise promise = Promise.promise();
        seekToBeginning(set, promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> seekToBeginning(Set<TopicPartition> set, Handler<AsyncResult<Void>> handler) {
        this.context.runOnContext(r7 -> {
            this.current = null;
            submitTask((consumer, promise) -> {
                consumer.seekToBeginning(set);
                if (promise != null) {
                    promise.complete();
                }
            }, handler);
        });
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> seek(TopicPartition topicPartition, long j) {
        Promise promise = Promise.promise();
        seek(topicPartition, j, promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> seek(TopicPartition topicPartition, long j, Handler<AsyncResult<Void>> handler) {
        this.context.runOnContext(r10 -> {
            this.current = null;
            submitTask((consumer, promise) -> {
                consumer.seek(topicPartition, j);
                if (promise != null) {
                    promise.complete();
                }
            }, handler);
        });
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> partitionsRevokedHandler(Handler<Set<TopicPartition>> handler) {
        this.partitionsRevokedHandler = handler;
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> partitionsAssignedHandler(Handler<Set<TopicPartition>> handler) {
        this.partitionsAssignedHandler = handler;
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> subscribe(Set<String> set) {
        Promise promise = Promise.promise();
        subscribe(set, (Handler<AsyncResult<Void>>) promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> subscribe(Set<String> set, Handler<AsyncResult<Void>> handler) {
        BiConsumer<Consumer<K, V>, Promise<T>> biConsumer = (consumer, promise) -> {
            consumer.subscribe(set, this.rebalanceListener);
            startConsuming();
            if (promise != null) {
                promise.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            start(biConsumer, handler);
        } else {
            submitTask(biConsumer, handler);
        }
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> subscribe(Pattern pattern, Handler<AsyncResult<Void>> handler) {
        BiConsumer<Consumer<K, V>, Promise<T>> biConsumer = (consumer, promise) -> {
            consumer.subscribe(pattern, this.rebalanceListener);
            startConsuming();
            if (promise != null) {
                promise.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            start(biConsumer, handler);
        } else {
            submitTask(biConsumer, handler);
        }
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> subscribe(Pattern pattern) {
        Promise promise = Promise.promise();
        subscribe(pattern, (Handler<AsyncResult<Void>>) promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> unsubscribe() {
        Promise promise = Promise.promise();
        unsubscribe(promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> unsubscribe(Handler<AsyncResult<Void>> handler) {
        submitTask((consumer, promise) -> {
            consumer.unsubscribe();
            if (promise != null) {
                promise.complete();
            }
        }, handler);
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> subscription(Handler<AsyncResult<Set<String>>> handler) {
        submitTask((consumer, promise) -> {
            Set subscription = consumer.subscription();
            if (promise != null) {
                promise.complete(subscription);
            }
        }, handler);
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Set<String>> subscription() {
        Promise promise = Promise.promise();
        subscription(promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> assign(Set<TopicPartition> set) {
        Promise promise = Promise.promise();
        assign(set, promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> assign(Set<TopicPartition> set, Handler<AsyncResult<Void>> handler) {
        BiConsumer<Consumer<K, V>, Promise<T>> biConsumer = (consumer, promise) -> {
            consumer.assign(set);
            startConsuming();
            if (promise != null) {
                promise.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            start(biConsumer, handler);
        } else {
            submitTask(biConsumer, handler);
        }
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> assignment(Handler<AsyncResult<Set<TopicPartition>>> handler) {
        submitTask((consumer, promise) -> {
            Set assignment = consumer.assignment();
            if (promise != null) {
                promise.complete(assignment);
            }
        }, handler);
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Set<TopicPartition>> assignment() {
        Promise promise = Promise.promise();
        assignment(promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> listTopics(Handler<AsyncResult<Map<String, List<PartitionInfo>>>> handler) {
        submitTask((consumer, promise) -> {
            Map listTopics = consumer.listTopics();
            if (promise != null) {
                promise.complete(listTopics);
            }
        }, handler);
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Map<String, List<PartitionInfo>>> listTopics() {
        Promise promise = Promise.promise();
        listTopics(promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Map<TopicPartition, OffsetAndMetadata>> commit() {
        Promise promise = Promise.promise();
        commit((Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>>) promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void commit(Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> handler) {
        commit(null, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition, OffsetAndMetadata> map) {
        Promise promise = Promise.promise();
        commit(map, promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void commit(Map<TopicPartition, OffsetAndMetadata> map, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> handler) {
        submitTask((consumer, promise) -> {
            if (map == null) {
                consumer.commitSync();
            } else {
                consumer.commitSync(map);
            }
            if (promise != null) {
                promise.complete(map);
            }
        }, handler);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStreamImpl<K, V> partitionsFor(String str, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        submitTask((consumer, promise) -> {
            List partitionsFor = consumer.partitionsFor(str);
            if (promise != null) {
                promise.complete(partitionsFor);
            }
        }, handler);
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<List<PartitionInfo>> partitionsFor(String str) {
        Promise promise = Promise.promise();
        partitionsFor(str, (Handler<AsyncResult<List<PartitionInfo>>>) promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    /* renamed from: handler */
    public KafkaReadStreamImpl<K, V> mo35handler(Handler<ConsumerRecord<K, V>> handler) {
        this.recordHandler = handler;
        schedule(0L);
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    /* renamed from: pause */
    public KafkaReadStreamImpl<K, V> mo34pause() {
        this.demand.set(0L);
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    /* renamed from: resume */
    public KafkaReadStreamImpl<K, V> mo33resume() {
        return mo32fetch(Long.MAX_VALUE);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    /* renamed from: fetch */
    public KafkaReadStreamImpl<K, V> mo32fetch(long j) {
        if (j < 0) {
            throw new IllegalArgumentException("Invalid claim " + j);
        }
        if (this.demand.updateAndGet(j2 -> {
            long j2 = j2 + j;
            if (j2 < 0) {
                j2 = Long.MAX_VALUE;
            }
            return j2;
        }) > 0) {
            schedule(0L);
        }
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public long demand() {
        return this.demand.get();
    }

    private KafkaReadStreamImpl<K, V> startConsuming() {
        this.consuming.set(true);
        schedule(0L);
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStreamImpl<K, V> endHandler(Handler<Void> handler) {
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Void> close() {
        ContextInternal contextInternal = this.context;
        if (!this.closed.compareAndSet(false, true)) {
            return contextInternal.succeededFuture();
        }
        this.consumer.wakeup();
        PromiseInternal promise = contextInternal.promise();
        this.worker.submit(() -> {
            try {
                this.consumer.close();
                promise.complete();
            } catch (KafkaException e) {
                promise.fail(e);
            }
        });
        return promise.future().onComplete(asyncResult -> {
            this.worker.shutdownNow();
        });
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void close(Handler<AsyncResult<Void>> handler) {
        Future<Void> close = close();
        if (handler != null) {
            close.onComplete(handler);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void position(TopicPartition topicPartition, Handler<AsyncResult<Long>> handler) {
        submitTask((consumer, promise) -> {
            long position = this.consumer.position(topicPartition);
            if (promise != null) {
                promise.complete(Long.valueOf(position));
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Long> position(TopicPartition topicPartition) {
        Promise promise = Promise.promise();
        position(topicPartition, promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void offsetsForTimes(Map<TopicPartition, Long> map, Handler<AsyncResult<Map<TopicPartition, OffsetAndTimestamp>>> handler) {
        submitTask((consumer, promise) -> {
            Map offsetsForTimes = this.consumer.offsetsForTimes(map);
            if (promise != null) {
                promise.complete(offsetsForTimes);
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Map<TopicPartition, OffsetAndTimestamp>> offsetsForTimes(Map<TopicPartition, Long> map) {
        Promise promise = Promise.promise();
        offsetsForTimes(map, (Handler<AsyncResult<Map<TopicPartition, OffsetAndTimestamp>>>) promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void offsetsForTimes(TopicPartition topicPartition, long j, Handler<AsyncResult<OffsetAndTimestamp>> handler) {
        submitTask((consumer, promise) -> {
            HashMap hashMap = new HashMap();
            hashMap.put(topicPartition, Long.valueOf(j));
            Map offsetsForTimes = this.consumer.offsetsForTimes(hashMap);
            if (promise != null) {
                promise.complete(offsetsForTimes.get(topicPartition));
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<OffsetAndTimestamp> offsetsForTimes(TopicPartition topicPartition, long j) {
        Promise promise = Promise.promise();
        offsetsForTimes(topicPartition, j, promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void beginningOffsets(Set<TopicPartition> set, Handler<AsyncResult<Map<TopicPartition, Long>>> handler) {
        submitTask((consumer, promise) -> {
            Map beginningOffsets = this.consumer.beginningOffsets(set);
            if (promise != null) {
                promise.complete(beginningOffsets);
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Map<TopicPartition, Long>> beginningOffsets(Set<TopicPartition> set) {
        Promise promise = Promise.promise();
        beginningOffsets(set, (Handler<AsyncResult<Map<TopicPartition, Long>>>) promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void beginningOffsets(TopicPartition topicPartition, Handler<AsyncResult<Long>> handler) {
        submitTask((consumer, promise) -> {
            HashSet hashSet = new HashSet();
            hashSet.add(topicPartition);
            Map beginningOffsets = this.consumer.beginningOffsets(hashSet);
            if (promise != null) {
                promise.complete(beginningOffsets.get(topicPartition));
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Long> beginningOffsets(TopicPartition topicPartition) {
        Promise promise = Promise.promise();
        beginningOffsets(topicPartition, (Handler<AsyncResult<Long>>) promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void endOffsets(Set<TopicPartition> set, Handler<AsyncResult<Map<TopicPartition, Long>>> handler) {
        submitTask((consumer, promise) -> {
            Map endOffsets = this.consumer.endOffsets(set);
            if (promise != null) {
                promise.complete(endOffsets);
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Map<TopicPartition, Long>> endOffsets(Set<TopicPartition> set) {
        Promise promise = Promise.promise();
        endOffsets(set, (Handler<AsyncResult<Map<TopicPartition, Long>>>) promise);
        return promise.future();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void endOffsets(TopicPartition topicPartition, Handler<AsyncResult<Long>> handler) {
        submitTask((consumer, promise) -> {
            HashSet hashSet = new HashSet();
            hashSet.add(topicPartition);
            Map endOffsets = this.consumer.endOffsets(hashSet);
            if (promise != null) {
                promise.complete(endOffsets.get(topicPartition));
            }
        }, handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<Long> endOffsets(TopicPartition topicPartition) {
        Promise promise = Promise.promise();
        endOffsets(topicPartition, (Handler<AsyncResult<Long>>) promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Consumer<K, V> unwrap() {
        return this.consumer;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream batchHandler(Handler<ConsumerRecords<K, V>> handler) {
        this.batchHandler = handler;
        schedule(0L);
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public KafkaReadStream<K, V> pollTimeout(Duration duration) {
        this.pollTimeout = duration;
        return this;
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public void poll(Duration duration, Handler<AsyncResult<ConsumerRecords<K, V>>> handler) {
        this.worker.submit(() -> {
            if (this.closed.get()) {
                return;
            }
            try {
                ConsumerRecords poll = this.consumer.poll(duration);
                this.context.runOnContext(r5 -> {
                    handler.handle(Future.succeededFuture(poll));
                });
            } catch (Exception e) {
                this.context.runOnContext(r52 -> {
                    handler.handle(Future.failedFuture(e));
                });
            } catch (WakeupException e2) {
                this.context.runOnContext(r4 -> {
                    handler.handle(Future.succeededFuture(ConsumerRecords.empty()));
                });
            }
        });
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public Future<ConsumerRecords<K, V>> poll(Duration duration) {
        Promise promise = Promise.promise();
        poll(duration, promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public /* bridge */ /* synthetic */ KafkaReadStream partitionsFor(String str, Handler handler) {
        return partitionsFor(str, (Handler<AsyncResult<List<PartitionInfo>>>) handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public /* bridge */ /* synthetic */ KafkaReadStream endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    public /* bridge */ /* synthetic */ KafkaReadStream exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo31endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo36exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.kafka.client.consumer.KafkaReadStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo37exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
