package org.springframework.integration.aws.inbound.kinesis;

import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.integration.aws.event.KinesisShardEndedEvent;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.support.locks.RenewableLockRegistry;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.class */
public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport implements DisposableBean, ApplicationEventPublisherAware {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();
    private final KinesisAsyncClient amazonKinesis;
    private final String[] streams;
    private final Set<KinesisShardOffset> shardOffsets = new HashSet();
    private final Map<KinesisShardOffset, ShardConsumer> shardConsumers = new ConcurrentHashMap();
    private final Set<String> inResharding = new ConcurrentSkipListSet();
    private final List<ConsumerInvoker> consumerInvokers = new ArrayList();
    private final ShardConsumerManager shardConsumerManager = new ShardConsumerManager();
    private final ExecutorService shardLocksExecutor;
    private String consumerGroup;
    private ConcurrentMetadataStore checkpointStore;
    private Executor dispatcherExecutor;
    private boolean dispatcherExecutorExplicitlySet;
    private Executor consumerExecutor;
    private boolean consumerExecutorExplicitlySet;
    private int maxConcurrency;
    private int concurrency;
    private KinesisShardOffset streamInitialSequence;
    private Converter<byte[], Object> converter;
    private ListenerMode listenerMode;
    private CheckpointMode checkpointMode;
    private long checkpointsInterval;
    private int recordsLimit;
    private int idleBetweenPolls;
    private int consumerBackoff;
    private int startTimeout;
    private int describeStreamBackoff;
    private int describeStreamRetries;
    private long lockRenewalTimeout;
    private boolean resetCheckpoints;
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private LockRegistry lockRegistry;
    private boolean bindSourceRecord;
    private volatile boolean active;
    private volatile int consumerInvokerMaxCapacity;
    private volatile Future<?> shardConsumerManagerFuture;
    private ApplicationEventPublisher applicationEventPublisher;

    @Nullable
    private Function<List<Shard>, List<Shard>> shardListFilter;

    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.class */
    private final class ConsumerDispatcher implements SchedulingAwareRunnable {
        private final Set<String> inReshardingProcess = new HashSet();

        private ConsumerDispatcher() {
        }

        public void run() {
            while (KinesisMessageDrivenChannelAdapter.this.active) {
                for (String str : KinesisMessageDrivenChannelAdapter.this.inResharding) {
                    if (this.inReshardingProcess.add(str)) {
                        KinesisMessageDrivenChannelAdapter.this.logger.debug(() -> {
                            return "Resharding has happened for stream [" + str + "]. Rebalancing...";
                        });
                        KinesisMessageDrivenChannelAdapter.this.populateShardsForStream(str, null);
                    }
                }
                Iterator<ShardConsumer> it = KinesisMessageDrivenChannelAdapter.this.shardConsumers.values().iterator();
                while (it.hasNext()) {
                    ShardConsumer next = it.next();
                    next.execute();
                    if (ConsumerState.STOP == next.state) {
                        it.remove();
                        if (KinesisMessageDrivenChannelAdapter.this.streams != null && next.shardIterator == null) {
                            KinesisShardOffset kinesisShardOffset = next.shardOffset;
                            String stream = kinesisShardOffset.getStream();
                            if (KinesisMessageDrivenChannelAdapter.this.inResharding.add(stream)) {
                                this.inReshardingProcess.remove(stream);
                                synchronized (KinesisMessageDrivenChannelAdapter.this.shardOffsets) {
                                    KinesisMessageDrivenChannelAdapter.this.shardOffsets.remove(kinesisShardOffset);
                                }
                            } else {
                                continue;
                            }
                        }
                    }
                }
                KinesisMessageDrivenChannelAdapter.this.sleep(KinesisMessageDrivenChannelAdapter.this.idleBetweenPolls, new IllegalStateException("ConsumerDispatcher Thread [" + this + "] has been interrupted"), true);
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ConsumerInvoker.class */
    public final class ConsumerInvoker implements SchedulingAwareRunnable {
        private final Queue<ShardConsumer> consumers = new ConcurrentLinkedQueue();
        private final Semaphore processBarrier = new Semaphore(0);
        private final Runnable notifier = this::notifyBarrier;

        ConsumerInvoker(Collection<ShardConsumer> collection) {
            Iterator<ShardConsumer> it = collection.iterator();
            while (it.hasNext()) {
                addConsumer(it.next());
            }
        }

        void addConsumer(ShardConsumer shardConsumer) {
            shardConsumer.setNotifier(this.notifier);
            this.consumers.add(shardConsumer);
        }

        void notifyBarrier() {
            this.processBarrier.release();
        }

        /* JADX WARN: Code restructure failed: missing block: B:35:0x0098, code lost:
        
            r5.this$0.consumerInvokers.remove(r5);
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                r5 = this;
            L0:
                r0 = r5
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter r0 = org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.this
                boolean r0 = r0.active
                if (r0 == 0) goto Lba
                r0 = r5
                java.util.concurrent.Semaphore r0 = r0.processBarrier     // Catch: java.lang.InterruptedException -> L14
                r0.acquire()     // Catch: java.lang.InterruptedException -> L14
                goto L2a
            L14:
                r6 = move-exception
                java.lang.Thread r0 = java.lang.Thread.currentThread()
                r0.interrupt()
                java.lang.IllegalStateException r0 = new java.lang.IllegalStateException
                r1 = r0
                r2 = r5
                java.lang.String r2 = "ConsumerInvoker thread [" + r2 + "] has been interrupted"
                r3 = r6
                r1.<init>(r2, r3)
                throw r0
            L2a:
                r0 = r5
                java.util.Queue<org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer> r0 = r0.consumers
                java.util.Iterator r0 = r0.iterator()
                r6 = r0
            L34:
                r0 = r6
                boolean r0 = r0.hasNext()
                if (r0 == 0) goto L82
                r0 = r6
                java.lang.Object r0 = r0.next()
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer r0 = (org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.ShardConsumer) r0
                r7 = r0
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerState r0 = org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.ConsumerState.STOP
                r1 = r7
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerState r1 = r1.state
                if (r0 != r1) goto L5a
                r0 = r6
                r0.remove()
                goto L7f
            L5a:
                r0 = r7
                java.lang.Runnable r0 = r0.task
                if (r0 == 0) goto L7f
                r0 = r7
                java.lang.Runnable r0 = r0.task     // Catch: java.lang.Exception -> L6d
                r0.run()     // Catch: java.lang.Exception -> L6d
                goto L7f
            L6d:
                r8 = move-exception
                r0 = r5
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter r0 = org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.this
                org.springframework.core.log.LogAccessor r0 = org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access$2200(r0)
                r1 = r8
                r2 = r7
                void r1 = () -> { // java.util.function.Supplier.get():java.lang.Object
                    return lambda$run$0(r1, r2);
                }
                r0.info(r1)
            L7f:
                goto L34
            L82:
                r0 = r5
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter r0 = org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.this
                java.util.List<org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker> r0 = r0.consumerInvokers
                r1 = r0
                r6 = r1
                monitor-enter(r0)
                r0 = r5
                java.util.Queue<org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer> r0 = r0.consumers     // Catch: java.lang.Throwable -> Lb0
                boolean r0 = r0.isEmpty()     // Catch: java.lang.Throwable -> Lb0
                if (r0 == 0) goto Lab
                r0 = r5
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter r0 = org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.this     // Catch: java.lang.Throwable -> Lb0
                java.util.List<org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker> r0 = r0.consumerInvokers     // Catch: java.lang.Throwable -> Lb0
                r1 = r5
                boolean r0 = r0.remove(r1)     // Catch: java.lang.Throwable -> Lb0
                r0 = r6
                monitor-exit(r0)     // Catch: java.lang.Throwable -> Lb0
                goto Lba
            Lab:
                r0 = r6
                monitor-exit(r0)     // Catch: java.lang.Throwable -> Lb0
                goto Lb7
            Lb0:
                r9 = move-exception
                r0 = r6
                monitor-exit(r0)     // Catch: java.lang.Throwable -> Lb0
                r0 = r9
                throw r0
            Lb7:
                goto L0
            Lba:
                r0 = r5
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter r0 = org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.this
                java.util.List<org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker> r0 = r0.consumerInvokers
                r1 = r0
                r6 = r1
                monitor-enter(r0)
                r0 = r5
                org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter r0 = org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.this     // Catch: java.lang.Throwable -> Ld7
                java.util.List<org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker> r0 = r0.consumerInvokers     // Catch: java.lang.Throwable -> Ld7
                r1 = r5
                boolean r0 = r0.remove(r1)     // Catch: java.lang.Throwable -> Ld7
                r0 = r6
                monitor-exit(r0)     // Catch: java.lang.Throwable -> Ld7
                goto Lde
            Ld7:
                r10 = move-exception
                r0 = r6
                monitor-exit(r0)     // Catch: java.lang.Throwable -> Ld7
                r0 = r10
                throw r0
            Lde:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.ConsumerInvoker.run():void");
        }

        public boolean isLongLived() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ConsumerState.class */
    public enum ConsumerState {
        NEW,
        EXPIRED,
        CONSUME,
        SLEEP,
        STOP
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$LockCompletableFuture.class */
    public static final class LockCompletableFuture extends CompletableFuture<Boolean> {
        private final String lockKey;

        LockCompletableFuture(String str) {
            this.lockKey = str;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ShardConsumer.class */
    public final class ShardConsumer {
        private final KinesisShardOffset shardOffset;
        private final ShardCheckpointer checkpointer;
        private final String key;
        private long nextCheckpointTimeInMillis;
        private Runnable notifier;
        private volatile Runnable task;
        private volatile String shardIterator;
        private volatile long sleepUntil;
        private volatile ConsumerState state = ConsumerState.NEW;
        private final Runnable processTask = processTask();

        ShardConsumer(KinesisShardOffset kinesisShardOffset) {
            this.shardOffset = new KinesisShardOffset(kinesisShardOffset);
            this.key = KinesisMessageDrivenChannelAdapter.this.buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard());
            this.checkpointer = new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, this.key);
        }

        void setNotifier(Runnable runnable) {
            this.notifier = runnable;
        }

        void stop() {
            this.state = ConsumerState.STOP;
            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                LockCompletableFuture lockCompletableFuture = new LockCompletableFuture(this.key);
                KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.unlock(lockCompletableFuture);
                try {
                    lockCompletableFuture.get(KinesisMessageDrivenChannelAdapter.this.lockRenewalTimeout, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    KinesisMessageDrivenChannelAdapter.this.logger.info(e, () -> {
                        return "The lock for key '" + this.key + "' was not unlocked in time";
                    });
                }
            }
            if (this.notifier != null) {
                this.notifier.run();
            }
        }

        void close() {
            stop();
            this.checkpointer.close();
        }

        void execute() {
            if (this.task == null && renewLockIfAny()) {
                switch (this.state) {
                    case NEW:
                    case EXPIRED:
                        this.task = () -> {
                            try {
                                if (this.shardOffset.isReset()) {
                                    this.checkpointer.remove();
                                } else {
                                    String checkpoint = this.checkpointer.getCheckpoint();
                                    if (checkpoint != null) {
                                        this.shardOffset.setSequenceNumber(checkpoint);
                                        this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                                    }
                                }
                                if (this.state == ConsumerState.NEW) {
                                    KinesisMessageDrivenChannelAdapter.this.logger.info(() -> {
                                        return "The [" + this + "] has been started.";
                                    });
                                }
                                try {
                                    this.shardIterator = (String) KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getShardIterator(this.shardOffset.toShardIteratorRequest()).thenApply((v0) -> {
                                        return v0.shardIterator();
                                    }).join();
                                } catch (CompletionException e) {
                                    InvalidArgumentException cause = e.getCause();
                                    if (!(cause instanceof InvalidArgumentException) || !cause.getMessage().contains("has reached max possible value for the shard")) {
                                        throw e;
                                    }
                                    KinesisMessageDrivenChannelAdapter.this.logger.info(() -> {
                                        return "The [" + this.shardOffset + "] has been closed. Skipping...";
                                    });
                                }
                                if (this.shardIterator == null) {
                                    this.state = ConsumerState.STOP;
                                }
                                if (ConsumerState.STOP != this.state) {
                                    this.state = ConsumerState.CONSUME;
                                }
                            } finally {
                                this.task = null;
                            }
                        };
                        break;
                    case CONSUME:
                        this.task = this.processTask;
                        break;
                    case SLEEP:
                        if (System.currentTimeMillis() >= this.sleepUntil) {
                            this.state = ConsumerState.CONSUME;
                        }
                        this.task = null;
                        break;
                    case STOP:
                        if (this.shardIterator == null) {
                            KinesisMessageDrivenChannelAdapter.this.logger.info(() -> {
                                return "Stopping the [" + this + "] on the checkpoint [" + this.checkpointer.getCheckpoint() + "] because the shard has been CLOSED and exhausted.";
                            });
                        } else {
                            KinesisMessageDrivenChannelAdapter.this.logger.info(() -> {
                                return "Stopping the [" + this + "].";
                            });
                        }
                        this.task = null;
                        break;
                }
                if (this.task != null) {
                    if (this.notifier != null) {
                        this.notifier.run();
                    }
                    if (KinesisMessageDrivenChannelAdapter.this.concurrency == 0) {
                        KinesisMessageDrivenChannelAdapter.this.consumerExecutor.execute(this.task);
                    }
                }
            }
        }

        private boolean renewLockIfAny() {
            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry == null || this.state != ConsumerState.CONSUME) {
                return true;
            }
            LockCompletableFuture lockCompletableFuture = new LockCompletableFuture(this.key);
            KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.renewLock(lockCompletableFuture);
            boolean z = false;
            try {
                z = lockCompletableFuture.get(KinesisMessageDrivenChannelAdapter.this.lockRenewalTimeout, TimeUnit.MILLISECONDS).booleanValue();
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                KinesisMessageDrivenChannelAdapter.this.logger.info(e, () -> {
                    return "The lock for key '" + this.key + "' was not renewed in time";
                });
            }
            if (z || this.state != ConsumerState.CONSUME) {
                return true;
            }
            this.state = ConsumerState.STOP;
            this.checkpointer.close();
            if (this.notifier != null) {
                this.notifier.run();
            }
            if (!KinesisMessageDrivenChannelAdapter.this.active) {
                return false;
            }
            KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.addShardToConsume(this.shardOffset);
            return false;
        }

        private Runnable processTask() {
            return () -> {
                GetRecordsResponse getRecordsResponse = null;
                try {
                    getRecordsResponse = getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(this.shardIterator).limit(Integer.valueOf(KinesisMessageDrivenChannelAdapter.this.recordsLimit)).build());
                    if (getRecordsResponse != null) {
                        List<Record> records = getRecordsResponse.records();
                        if (!records.isEmpty()) {
                            processRecords(records);
                        }
                    }
                    KinesisMessageDrivenChannelAdapter.attributesHolder.remove();
                    if (getRecordsResponse != null) {
                        List records2 = getRecordsResponse.records();
                        if (!CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode) || records2.isEmpty()) {
                            this.shardIterator = getRecordsResponse.nextShardIterator();
                        } else {
                            KinesisMessageDrivenChannelAdapter.this.logger.info("Manual checkpointer. Must validate if should use getNextShardIterator()");
                            String sequenceNumber = ((Record) records2.get(records2.size() - 1)).sequenceNumber();
                            String checkpoint = this.checkpointer.getCheckpoint();
                            if (checkpoint.equals(sequenceNumber)) {
                                KinesisMessageDrivenChannelAdapter.this.logger.info("latestCheckpointSequence is same as latestRecordSequence. Should getNextShardIterator()");
                                this.shardIterator = getRecordsResponse.nextShardIterator();
                            } else {
                                KinesisMessageDrivenChannelAdapter.this.logger.info("latestCheckpointSequence is not the same as latestRecordSequence. Should Get a new iterator AFTER_SEQUENCE_NUMBER latestCheckpointSequence");
                                KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(this.shardOffset);
                                kinesisShardOffset.setSequenceNumber(checkpoint);
                                kinesisShardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                                this.shardIterator = ((GetShardIteratorResponse) KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getShardIterator(kinesisShardOffset.toShardIteratorRequest()).join()).shardIterator();
                            }
                        }
                        if (this.shardIterator == null) {
                            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                                KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.shardOffsetsToConsumer.remove(this.key);
                            }
                            if (!CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode) || this.checkpointer.getLastCheckpointValue() == null) {
                                Iterator<Shard> it = KinesisMessageDrivenChannelAdapter.this.readShardList(this.shardOffset.getStream()).iterator();
                                while (true) {
                                    if (!it.hasNext()) {
                                        break;
                                    }
                                    Shard next = it.next();
                                    if (next.shardId().equals(this.shardOffset.getShard())) {
                                        String endingSequenceNumber = next.sequenceNumberRange().endingSequenceNumber();
                                        if (endingSequenceNumber != null) {
                                            checkpointSwallowingProvisioningExceptions(endingSequenceNumber);
                                        }
                                    }
                                }
                            }
                            if (KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
                                KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent(new KinesisShardEndedEvent(KinesisMessageDrivenChannelAdapter.this, this.key));
                            }
                            stop();
                        }
                        if (ConsumerState.STOP != this.state && getRecordsResponse.records().isEmpty()) {
                            KinesisMessageDrivenChannelAdapter.this.logger.debug(() -> {
                                return "No records for [" + this + "] on sequenceNumber [" + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds.";
                            });
                            prepareSleepState();
                        }
                    }
                    this.task = null;
                } catch (Throwable th) {
                    KinesisMessageDrivenChannelAdapter.attributesHolder.remove();
                    if (getRecordsResponse != null) {
                        List records3 = getRecordsResponse.records();
                        if (!CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode) || records3.isEmpty()) {
                            this.shardIterator = getRecordsResponse.nextShardIterator();
                        } else {
                            KinesisMessageDrivenChannelAdapter.this.logger.info("Manual checkpointer. Must validate if should use getNextShardIterator()");
                            String sequenceNumber2 = ((Record) records3.get(records3.size() - 1)).sequenceNumber();
                            String checkpoint2 = this.checkpointer.getCheckpoint();
                            if (checkpoint2.equals(sequenceNumber2)) {
                                KinesisMessageDrivenChannelAdapter.this.logger.info("latestCheckpointSequence is same as latestRecordSequence. Should getNextShardIterator()");
                                this.shardIterator = getRecordsResponse.nextShardIterator();
                            } else {
                                KinesisMessageDrivenChannelAdapter.this.logger.info("latestCheckpointSequence is not the same as latestRecordSequence. Should Get a new iterator AFTER_SEQUENCE_NUMBER latestCheckpointSequence");
                                KinesisShardOffset kinesisShardOffset2 = new KinesisShardOffset(this.shardOffset);
                                kinesisShardOffset2.setSequenceNumber(checkpoint2);
                                kinesisShardOffset2.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                                this.shardIterator = ((GetShardIteratorResponse) KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getShardIterator(kinesisShardOffset2.toShardIteratorRequest()).join()).shardIterator();
                            }
                        }
                        if (this.shardIterator == null) {
                            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                                KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.shardOffsetsToConsumer.remove(this.key);
                            }
                            if (!CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode) || this.checkpointer.getLastCheckpointValue() == null) {
                                Iterator<Shard> it2 = KinesisMessageDrivenChannelAdapter.this.readShardList(this.shardOffset.getStream()).iterator();
                                while (true) {
                                    if (!it2.hasNext()) {
                                        break;
                                    }
                                    Shard next2 = it2.next();
                                    if (next2.shardId().equals(this.shardOffset.getShard())) {
                                        String endingSequenceNumber2 = next2.sequenceNumberRange().endingSequenceNumber();
                                        if (endingSequenceNumber2 != null) {
                                            checkpointSwallowingProvisioningExceptions(endingSequenceNumber2);
                                        }
                                    }
                                }
                            }
                            if (KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
                                KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent(new KinesisShardEndedEvent(KinesisMessageDrivenChannelAdapter.this, this.key));
                            }
                            stop();
                        }
                        if (ConsumerState.STOP != this.state && getRecordsResponse.records().isEmpty()) {
                            KinesisMessageDrivenChannelAdapter.this.logger.debug(() -> {
                                return "No records for [" + this + "] on sequenceNumber [" + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds.";
                            });
                            prepareSleepState();
                        }
                    }
                    this.task = null;
                    throw th;
                }
            };
        }

        private void checkpointSwallowingProvisioningExceptions(String str) {
            try {
                this.checkpointer.checkpoint(str);
            } catch (ProvisionedThroughputExceededException e) {
                KinesisMessageDrivenChannelAdapter.this.logger.debug(e, "Exception while checkpointing empty shards");
            }
        }

        private GetRecordsResponse getRecords(GetRecordsRequest getRecordsRequest) {
            try {
                return (GetRecordsResponse) KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getRecords(getRecordsRequest).join();
            } catch (ExpiredIteratorException e) {
                KinesisMessageDrivenChannelAdapter.this.logger.info(() -> {
                    return "Shard iterator for [" + this + "] expired.\nA new one will be started from the check pointed sequence number.";
                });
                this.state = ConsumerState.EXPIRED;
                return null;
            } catch (ProvisionedThroughputExceededException e2) {
                KinesisMessageDrivenChannelAdapter.this.logger.warn(() -> {
                    return "GetRecords request throttled for [" + this + "] with the reason: " + e2.getMessage();
                });
                prepareSleepState();
                return null;
            }
        }

        private void prepareSleepState() {
            this.sleepUntil = System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.consumerBackoff;
            this.state = ConsumerState.SLEEP;
        }

        private void processRecords(List<Record> list) {
            KinesisMessageDrivenChannelAdapter.this.logger.trace(() -> {
                return "Processing records: " + list + " for [" + this + "]";
            });
            this.checkpointer.setHighestSequence(list.get(list.size() - 1).sequenceNumber());
            if (ListenerMode.record.equals(KinesisMessageDrivenChannelAdapter.this.listenerMode)) {
                for (Record record : list) {
                    processSingleRecord(record);
                    checkpointIfRecordMode(record);
                    checkpointIfPeriodicMode(record);
                }
            } else if (ListenerMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.listenerMode)) {
                processMultipleRecords(list);
                checkpointIfPeriodicMode(null);
            }
            checkpointIfBatchMode();
        }

        private void processSingleRecord(Record record) {
            performSend(prepareMessageForRecord(record), record);
        }

        private void processMultipleRecords(List<Record> list) {
            AbstractIntegrationMessageBuilder<?> withPayload = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(list);
            if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                withPayload = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload((List) list.stream().map(this::prepareMessageForRecord).map((v0) -> {
                    return v0.build();
                }).collect(Collectors.toList()));
            } else if (KinesisMessageDrivenChannelAdapter.this.converter != null) {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                withPayload = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload((List) list.stream().map(record -> {
                    arrayList.add(record.partitionKey());
                    arrayList2.add(record.sequenceNumber());
                    return KinesisMessageDrivenChannelAdapter.this.converter.convert(record.data().asByteArray());
                }).collect(Collectors.toList())).setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, arrayList).setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, arrayList2);
            }
            performSend(withPayload, list);
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record record) {
            Object asByteArray = record.data().asByteArray();
            Message message = null;
            if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    message = KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage((byte[]) asByteArray);
                    asByteArray = message.getPayload();
                } catch (Exception e) {
                    KinesisMessageDrivenChannelAdapter.this.logger.warn(e, "Could not parse embedded headers. Remain payload untouched.");
                }
            }
            if ((asByteArray instanceof byte[]) && KinesisMessageDrivenChannelAdapter.this.converter != null) {
                asByteArray = KinesisMessageDrivenChannelAdapter.this.converter.convert((byte[]) asByteArray);
            }
            AbstractIntegrationMessageBuilder<Object> header = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(asByteArray).setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, record.partitionKey()).setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, record.sequenceNumber());
            if (KinesisMessageDrivenChannelAdapter.this.bindSourceRecord) {
                header.setHeader("sourceData", record);
            }
            if (message != null) {
                header.copyHeadersIfAbsent(message.getHeaders());
            }
            return header;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> abstractIntegrationMessageBuilder, Object obj) {
            abstractIntegrationMessageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream()).setHeader(AwsHeaders.SHARD, this.shardOffset.getShard());
            if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                abstractIntegrationMessageBuilder.setHeader(AwsHeaders.CHECKPOINTER, this.checkpointer);
            }
            Message<?> build = abstractIntegrationMessageBuilder.build();
            KinesisMessageDrivenChannelAdapter.this.setAttributesIfNecessary(obj, build);
            try {
                KinesisMessageDrivenChannelAdapter.this.sendMessage(build);
            } catch (Exception e) {
                KinesisMessageDrivenChannelAdapter.this.logger.info(e, () -> {
                    return "Got an exception during sending a '" + build + "'\nfor the '" + obj + "'.\nConsider to use 'errorChannel' flow for the compensation logic.";
                });
            }
        }

        private void checkpointIfBatchMode() {
            if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpointer.checkpoint();
            }
        }

        private void checkpointIfRecordMode(Record record) {
            if (CheckpointMode.record.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpointer.checkpoint(record.sequenceNumber());
            }
        }

        private void checkpointIfPeriodicMode(@Nullable Record record) {
            if (!CheckpointMode.periodic.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode) || System.currentTimeMillis() <= this.nextCheckpointTimeInMillis) {
                return;
            }
            if (record == null) {
                this.checkpointer.checkpoint();
            } else {
                this.checkpointer.checkpoint(record.sequenceNumber());
            }
            this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.checkpointsInterval;
        }

        public String toString() {
            return "ShardConsumer{shardOffset=" + this.shardOffset + ", state=" + this.state + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter$ShardConsumerManager.class */
    public final class ShardConsumerManager implements SchedulingAwareRunnable {
        private final Map<String, KinesisShardOffset> shardOffsetsToConsumer = new ConcurrentHashMap();
        private final Map<String, Lock> locks = new HashMap();
        private final Queue<LockCompletableFuture> forUnlocking = new ConcurrentLinkedQueue();
        private final Queue<LockCompletableFuture> forRenewing = new ConcurrentLinkedQueue();

        ShardConsumerManager() {
        }

        void addShardToConsume(KinesisShardOffset kinesisShardOffset) {
            this.shardOffsetsToConsumer.put(KinesisMessageDrivenChannelAdapter.this.buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard()), kinesisShardOffset);
        }

        void unlock(LockCompletableFuture lockCompletableFuture) {
            this.forUnlocking.add(lockCompletableFuture);
        }

        void renewLock(LockCompletableFuture lockCompletableFuture) {
            this.forRenewing.add(lockCompletableFuture);
        }

        public void run() {
            LockCompletableFuture poll;
            LockCompletableFuture poll2;
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    this.shardOffsetsToConsumer.entrySet().removeIf(entry -> {
                        boolean z = true;
                        if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                            String str = (String) entry.getKey();
                            Lock obtain = KinesisMessageDrivenChannelAdapter.this.lockRegistry.obtain(str);
                            try {
                                if (obtain.tryLock()) {
                                    this.locks.put(str, obtain);
                                } else {
                                    z = false;
                                }
                            } catch (Exception e) {
                                KinesisMessageDrivenChannelAdapter.this.logger.error(e, "Error during locking: " + obtain);
                            }
                        }
                        if (z) {
                            KinesisMessageDrivenChannelAdapter.this.populateConsumer((KinesisShardOffset) entry.getValue());
                        }
                        return z;
                    });
                    while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null && (poll2 = this.forUnlocking.poll()) != null) {
                        Lock remove = this.locks.remove(poll2.lockKey);
                        if (remove != null) {
                            try {
                                remove.unlock();
                            } catch (Exception e) {
                                KinesisMessageDrivenChannelAdapter.this.logger.error(e, "Error during unlocking: " + remove);
                            }
                        }
                        poll2.complete(true);
                    }
                    while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null && (poll = this.forRenewing.poll()) != null) {
                        Lock lock = this.locks.get(poll.lockKey);
                        if (lock != null) {
                            try {
                                if (!renewLockInRegistry(poll)) {
                                    poll.complete(false);
                                    this.locks.remove(poll.lockKey);
                                }
                            } catch (Exception e2) {
                                poll.complete(false);
                                KinesisMessageDrivenChannelAdapter.this.logger.error(e2, () -> {
                                    return "Error during locking: " + lock;
                                });
                            }
                        } else {
                            poll.complete(false);
                        }
                    }
                    KinesisMessageDrivenChannelAdapter.this.sleep(1000L, new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted"), true);
                } catch (Throwable th) {
                    Iterator<Lock> it = this.locks.values().iterator();
                    while (it.hasNext()) {
                        Lock next = it.next();
                        try {
                            try {
                                next.unlock();
                                it.remove();
                            } catch (Exception e3) {
                                if (KinesisMessageDrivenChannelAdapter.this.active) {
                                    KinesisMessageDrivenChannelAdapter.this.logger.error(e3, () -> {
                                        return "Error during unlocking: " + next;
                                    });
                                } else {
                                    KinesisMessageDrivenChannelAdapter.this.logger.info(e3, () -> {
                                        return "Error during unlocking: " + next + " while adapter was inactive";
                                    });
                                }
                                it.remove();
                            }
                        } catch (Throwable th2) {
                            it.remove();
                            throw th2;
                        }
                    }
                    throw th;
                }
            }
            Iterator<Lock> it2 = this.locks.values().iterator();
            while (it2.hasNext()) {
                Lock next2 = it2.next();
                try {
                    try {
                        next2.unlock();
                        it2.remove();
                    } catch (Exception e4) {
                        if (KinesisMessageDrivenChannelAdapter.this.active) {
                            KinesisMessageDrivenChannelAdapter.this.logger.error(e4, () -> {
                                return "Error during unlocking: " + next2;
                            });
                        } else {
                            KinesisMessageDrivenChannelAdapter.this.logger.info(e4, () -> {
                                return "Error during unlocking: " + next2 + " while adapter was inactive";
                            });
                        }
                        it2.remove();
                    }
                } catch (Throwable th3) {
                    it2.remove();
                    throw th3;
                }
            }
        }

        private boolean renewLockInRegistry(LockCompletableFuture lockCompletableFuture) {
            RenewableLockRegistry renewableLockRegistry = KinesisMessageDrivenChannelAdapter.this.lockRegistry;
            if (renewableLockRegistry instanceof RenewableLockRegistry) {
                try {
                    renewableLockRegistry.renewLock(lockCompletableFuture.lockKey);
                    return lockCompletableFuture.complete(true);
                } catch (IllegalStateException e) {
                    return false;
                }
            }
            Lock lock = this.locks.get(lockCompletableFuture.lockKey);
            if (!lock.tryLock()) {
                return false;
            }
            try {
                boolean complete = lockCompletableFuture.complete(true);
                lock.unlock();
                return complete;
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    public KinesisMessageDrivenChannelAdapter(KinesisAsyncClient kinesisAsyncClient, String... strArr) {
        this.shardLocksExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory((getComponentName() == null ? "" : getComponentName()) + "-kinesis-shard-locks-"));
        this.consumerGroup = "SpringIntegration";
        this.checkpointStore = new SimpleMetadataStore();
        this.streamInitialSequence = KinesisShardOffset.latest();
        this.converter = new DeserializingConverter();
        this.listenerMode = ListenerMode.record;
        this.checkpointMode = CheckpointMode.batch;
        this.checkpointsInterval = 5000L;
        this.recordsLimit = 10000;
        this.idleBetweenPolls = 1000;
        this.consumerBackoff = 1000;
        this.startTimeout = 60000;
        this.describeStreamBackoff = 1000;
        this.describeStreamRetries = 50;
        this.lockRenewalTimeout = 10000L;
        Assert.notNull(kinesisAsyncClient, "'amazonKinesis' must not be null.");
        Assert.notEmpty(strArr, "'streams' must not be null.");
        this.amazonKinesis = kinesisAsyncClient;
        this.streams = (String[]) Arrays.copyOf(strArr, strArr.length);
    }

    public KinesisMessageDrivenChannelAdapter(KinesisAsyncClient kinesisAsyncClient, KinesisShardOffset... kinesisShardOffsetArr) {
        this.shardLocksExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory((getComponentName() == null ? "" : getComponentName()) + "-kinesis-shard-locks-"));
        this.consumerGroup = "SpringIntegration";
        this.checkpointStore = new SimpleMetadataStore();
        this.streamInitialSequence = KinesisShardOffset.latest();
        this.converter = new DeserializingConverter();
        this.listenerMode = ListenerMode.record;
        this.checkpointMode = CheckpointMode.batch;
        this.checkpointsInterval = 5000L;
        this.recordsLimit = 10000;
        this.idleBetweenPolls = 1000;
        this.consumerBackoff = 1000;
        this.startTimeout = 60000;
        this.describeStreamBackoff = 1000;
        this.describeStreamRetries = 50;
        this.lockRenewalTimeout = 10000L;
        Assert.notNull(kinesisAsyncClient, "'amazonKinesis' must not be null.");
        Assert.notEmpty(kinesisShardOffsetArr, "'shardOffsets' must not be null.");
        Assert.noNullElements(kinesisShardOffsetArr, "'shardOffsets' must not contain null elements.");
        for (KinesisShardOffset kinesisShardOffset : kinesisShardOffsetArr) {
            Assert.isTrue(StringUtils.hasText(kinesisShardOffset.getStream()) && StringUtils.hasText(kinesisShardOffset.getShard()), "The 'shardOffsets' must be provided with particular 'stream' and 'shard' values.");
            this.shardOffsets.add(new KinesisShardOffset(kinesisShardOffset));
        }
        this.amazonKinesis = kinesisAsyncClient;
        this.streams = null;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setConsumerGroup(String str) {
        Assert.hasText(str, "'consumerGroup' must not be empty");
        this.consumerGroup = str;
    }

    public void setCheckpointStore(ConcurrentMetadataStore concurrentMetadataStore) {
        Assert.notNull(concurrentMetadataStore, "'checkpointStore' must not be null");
        this.checkpointStore = concurrentMetadataStore;
    }

    public void setConsumerExecutor(Executor executor) {
        Assert.notNull(executor, "'executor' must not be null");
        this.consumerExecutor = executor;
        this.consumerExecutorExplicitlySet = true;
    }

    public void setDispatcherExecutor(Executor executor) {
        this.dispatcherExecutor = executor;
        this.dispatcherExecutorExplicitlySet = true;
    }

    public void setStreamInitialSequence(KinesisShardOffset kinesisShardOffset) {
        Assert.notNull(kinesisShardOffset, "'streamInitialSequence' must not be null");
        this.streamInitialSequence = kinesisShardOffset;
    }

    public void setConverter(Converter<byte[], Object> converter) {
        this.converter = converter;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        Assert.notNull(listenerMode, "'listenerMode' must not be null");
        this.listenerMode = listenerMode;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull(checkpointMode, "'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setCheckpointsInterval(long j) {
        this.checkpointsInterval = j;
    }

    public void setRecordsLimit(int i) {
        Assert.isTrue(i > 0, "'recordsLimit' must be more than 0");
        this.recordsLimit = Math.min(10000, i);
    }

    public void setConsumerBackoff(int i) {
        this.consumerBackoff = Math.max(1000, i);
    }

    public void setDescribeStreamBackoff(int i) {
        this.describeStreamBackoff = Math.max(1000, i);
    }

    public void setDescribeStreamRetries(int i) {
        Assert.isTrue(i > 0, "'describeStreamRetries' must be more than 0");
        this.describeStreamRetries = i;
    }

    public void setStartTimeout(int i) {
        Assert.isTrue(i > 0, "'startTimeout' must be more than 0");
        this.startTimeout = i;
    }

    public void setLockRenewalTimeout(long j) {
        Assert.isTrue(j > 0, "'lockRenewalTimeout' must be more than 0");
        this.lockRenewalTimeout = j;
    }

    public void setConcurrency(int i) {
        this.maxConcurrency = i;
    }

    public void setIdleBetweenPolls(int i) {
        this.idleBetweenPolls = Math.max(250, i);
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> inboundMessageMapper) {
        this.embeddedHeadersMapper = inboundMessageMapper;
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        this.lockRegistry = lockRegistry;
    }

    public void setBindSourceRecord(boolean z) {
        this.bindSourceRecord = z;
    }

    public void setShardListFilter(Function<List<Shard>, List<Shard>> function) {
        this.shardListFilter = function;
    }

    protected void onInit() {
        super.onInit();
        String componentName = getComponentName();
        if (this.consumerExecutor == null) {
            this.consumerExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory((componentName == null ? "" : componentName) + "-kinesis-consumer-"));
        }
        if (this.dispatcherExecutor == null) {
            this.dispatcherExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory((componentName == null ? "" : componentName) + "-kinesis-dispatcher-"));
        }
        if (this.streams == null) {
            if (this.lockRegistry != null) {
                this.logger.warn("The LockRegistry is ignored when explicit shards configuration is used.");
            }
            this.lockRegistry = null;
        }
    }

    public void destroy() {
        if (!this.consumerExecutorExplicitlySet) {
            ((ExecutorService) this.consumerExecutor).shutdown();
        }
        if (this.dispatcherExecutorExplicitlySet) {
            return;
        }
        ((ExecutorService) this.dispatcherExecutor).shutdown();
    }

    @ManagedOperation
    public void stopConsumer(String str, String str2) {
        ShardConsumer remove = this.shardConsumers.remove(KinesisShardOffset.latest(str, str2));
        if (remove != null) {
            remove.stop();
        } else {
            this.logger.debug(() -> {
                return "There is no ShardConsumer for shard [" + str2 + "] in stream [" + str + "] to stop.";
            });
        }
    }

    @ManagedOperation
    public void startConsumer(String str, String str2) {
        KinesisShardOffset latest = KinesisShardOffset.latest(str, str2);
        ShardConsumer shardConsumer = this.shardConsumers.get(latest);
        if (shardConsumer != null) {
            this.logger.debug(() -> {
                return "The [" + shardConsumer + "] has been started before.";
            });
            return;
        }
        synchronized (this.shardOffsets) {
            Iterator<KinesisShardOffset> it = this.shardOffsets.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                KinesisShardOffset next = it.next();
                if (latest.equals(next)) {
                    this.shardConsumerManager.addShardToConsume(next);
                    break;
                }
            }
        }
    }

    @ManagedOperation
    public void resetCheckpointForShardToLatest(String str, String str2) {
        restartShardConsumerForOffset(KinesisShardOffset.latest(str, str2));
    }

    @ManagedOperation
    public void resetCheckpointForShardToTrimHorizon(String str, String str2) {
        restartShardConsumerForOffset(KinesisShardOffset.trimHorizon(str, str2));
    }

    @ManagedOperation
    public void resetCheckpointForShardToSequenceNumber(String str, String str2, String str3) {
        restartShardConsumerForOffset(KinesisShardOffset.atSequenceNumber(str, str2, str3));
    }

    @ManagedOperation
    public void resetCheckpointForShardAtTimestamp(String str, String str2, long j) {
        restartShardConsumerForOffset(KinesisShardOffset.atTimestamp(str, str2, Instant.ofEpochSecond(j)));
    }

    private void restartShardConsumerForOffset(KinesisShardOffset kinesisShardOffset) {
        Assert.isTrue(this.shardOffsets.contains(kinesisShardOffset), "The [" + this + "] doesn't operate shard [" + kinesisShardOffset.getShard() + "] for stream [" + kinesisShardOffset.getStream() + "]");
        this.logger.debug(() -> {
            return "Resetting consumer for [" + kinesisShardOffset + "]...";
        });
        kinesisShardOffset.reset();
        synchronized (this.shardOffsets) {
            this.shardOffsets.remove(kinesisShardOffset);
            this.shardOffsets.add(kinesisShardOffset);
        }
        if (this.active) {
            ShardConsumer remove = this.shardConsumers.remove(kinesisShardOffset);
            if (remove != null) {
                remove.close();
            }
            kinesisShardOffset.setReset(true);
            this.shardConsumerManager.addShardToConsume(kinesisShardOffset);
        }
    }

    @ManagedOperation
    public void resetCheckpoints() {
        this.resetCheckpoints = true;
        if (this.active) {
            stopConsumers();
            populateConsumers();
        }
    }

    protected void doStart() {
        super.doStart();
        if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) {
            this.checkpointMode = CheckpointMode.batch;
            this.logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] because it does not make sense in case of [ListenerMode.batch].");
        }
        if (this.streams != null) {
            populateShardsForStreams();
        }
        populateConsumers();
        this.active = true;
        this.concurrency = Math.min(this.maxConcurrency, this.shardOffsets.size());
        this.dispatcherExecutor.execute(new ConsumerDispatcher());
        this.shardConsumerManagerFuture = this.shardLocksExecutor.submit((Runnable) this.shardConsumerManager);
    }

    private Collection<ShardConsumer> shardConsumerSubset(int i) {
        ArrayList arrayList = new ArrayList(this.shardConsumers.values());
        if (this.concurrency == 1) {
            return arrayList;
        }
        int size = arrayList.size();
        if (size == this.concurrency) {
            return Collections.singleton((ShardConsumer) arrayList.get(i));
        }
        int i2 = size / this.concurrency;
        return i == this.concurrency - 1 ? arrayList.subList(i * i2, size) : arrayList.subList(i * i2, (i + 1) * i2);
    }

    private List<Shard> readShardList(String str) {
        return readShardList(str, 0);
    }

    private List<Shard> readShardList(String str, int i) {
        ArrayList arrayList = new ArrayList();
        if (i > this.describeStreamRetries) {
            throw new IllegalStateException("Kinesis could not read shards from stream with name [" + str + "] ");
        }
        String str2 = null;
        ListShardsRequest.Builder streamName = ListShardsRequest.builder().streamName(str);
        do {
            try {
                ListShardsResponse listShardsResponse = (ListShardsResponse) this.amazonKinesis.listShards((ListShardsRequest) streamName.nextToken(str2).build()).join();
                arrayList.addAll(listShardsResponse.shards());
                str2 = listShardsResponse.nextToken();
            } catch (CompletionException e) {
                if (!(e.getCause() instanceof LimitExceededException)) {
                    throw e;
                }
                this.logger.info(() -> {
                    return "Got LimitExceededException when listing stream [" + str + "]. Backing off for [" + this.describeStreamBackoff + "] millis.";
                });
                try {
                    Thread.sleep(this.describeStreamBackoff);
                    readShardList(str, i + 1);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("The [describeStream] thread for the stream [" + str + "] has been interrupted.", e2);
                }
            }
        } while (str2 != null);
        return arrayList;
    }

    private void populateShardsForStreams() {
        this.shardOffsets.clear();
        CountDownLatch countDownLatch = new CountDownLatch(this.streams.length);
        for (String str : this.streams) {
            populateShardsForStream(str, countDownLatch);
        }
        try {
            if (!countDownLatch.await(this.startTimeout, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("The [ " + this + "] could not start during timeout: " + this.startTimeout);
            }
        } catch (InterruptedException e) {
            throw new IllegalStateException("The [ " + this + "] has been interrupted from start.");
        }
    }

    private List<Shard> detectShardsToConsume(String str) {
        return detectShardsToConsume(str, 0);
    }

    private List<Shard> detectShardsToConsume(String str, int i) {
        ArrayList arrayList = new ArrayList();
        try {
            for (Shard shard : readShardList(str)) {
                String buildCheckpointKeyForShard = buildCheckpointKeyForShard(str, shard.shardId());
                String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
                if (endingSequenceNumber != null) {
                    String str2 = this.checkpointStore.get(buildCheckpointKeyForShard);
                    boolean z = str2 != null && new BigInteger(endingSequenceNumber).compareTo(new BigInteger(str2)) <= 0;
                    this.logger.trace(() -> {
                        return "The shard [" + shard + "] in stream [" + str + "] is closed CLOSED and exhausted with endingSequenceNumber [" + endingSequenceNumber + "].\nThe last processed checkpoint is [" + str2 + "]." + (z ? "\nThe shard will be skipped." : "");
                    });
                    if (z) {
                    }
                }
                arrayList.add(shard);
            }
        } catch (Exception e) {
            String str3 = "Got an exception when processing shards in stream [" + str + "]";
            this.logger.info(e, () -> {
                return str3 + ".\n Retrying... ";
            });
            if (i > 5) {
                throw new IllegalStateException(str3, e);
            }
            detectShardsToConsume(str, i + 1);
            sleep(this.describeStreamBackoff, new IllegalStateException(str3), false);
        }
        return this.shardListFilter != null ? this.shardListFilter.apply(arrayList) : arrayList;
    }

    private void sleep(long j, RuntimeException runtimeException, boolean z) {
        try {
            Thread.sleep(j);
        } catch (Exception e) {
            if (z) {
                Thread.currentThread().interrupt();
            }
            if (this.active) {
                this.logger.error(e, runtimeException.getMessage());
            } else {
                this.logger.info(e, () -> {
                    return runtimeException.getMessage() + " while adapter was inactive";
                });
            }
            throw runtimeException;
        }
    }

    private void populateShardsForStream(String str, CountDownLatch countDownLatch) {
        this.dispatcherExecutor.execute(() -> {
            boolean add;
            try {
                try {
                    for (Shard shard : detectShardsToConsume(str)) {
                        KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(this.streamInitialSequence);
                        kinesisShardOffset.setShard(shard.shardId());
                        kinesisShardOffset.setStream(str);
                        synchronized (this.shardOffsets) {
                            add = this.shardOffsets.add(kinesisShardOffset);
                        }
                        if (add && countDownLatch == null && this.active) {
                            this.shardConsumerManager.addShardToConsume(kinesisShardOffset);
                        }
                    }
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                    this.inResharding.remove(str);
                } catch (Exception e) {
                    this.logger.error(e, () -> {
                        return "Error population shards for stream: " + str;
                    });
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                    this.inResharding.remove(str);
                }
            } catch (Throwable th) {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
                this.inResharding.remove(str);
                throw th;
            }
        });
    }

    private void populateConsumers() {
        synchronized (this.shardOffsets) {
            Iterator<KinesisShardOffset> it = this.shardOffsets.iterator();
            while (it.hasNext()) {
                this.shardConsumerManager.addShardToConsume(it.next());
            }
        }
        this.resetCheckpoints = false;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void populateConsumer(KinesisShardOffset kinesisShardOffset) {
        kinesisShardOffset.setReset(this.resetCheckpoints);
        ShardConsumer shardConsumer = new ShardConsumer(kinesisShardOffset);
        if (this.active) {
            synchronized (this.consumerInvokers) {
                if (this.consumerInvokers.size() < this.maxConcurrency) {
                    SchedulingAwareRunnable consumerInvoker = new ConsumerInvoker(Collections.singleton(shardConsumer));
                    this.consumerInvokers.add(consumerInvoker);
                    this.consumerExecutor.execute(consumerInvoker);
                } else {
                    boolean z = false;
                    Iterator<ConsumerInvoker> it = this.consumerInvokers.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        ConsumerInvoker next = it.next();
                        if (next.consumers.size() < this.consumerInvokerMaxCapacity) {
                            next.addConsumer(shardConsumer);
                            z = true;
                            break;
                        }
                    }
                    if (this.concurrency != 0 && !z) {
                        ConsumerInvoker consumerInvoker2 = this.consumerInvokers.get(0);
                        consumerInvoker2.addConsumer(shardConsumer);
                        this.consumerInvokerMaxCapacity = consumerInvoker2.consumers.size();
                    }
                }
            }
        }
        this.shardConsumers.put(kinesisShardOffset, shardConsumer);
    }

    private String buildCheckpointKeyForShard(String str, String str2) {
        return this.consumerGroup + ":" + str + ":" + str2;
    }

    protected void doStop() {
        Iterator<ConsumerInvoker> it = this.consumerInvokers.iterator();
        while (it.hasNext()) {
            it.next().notifyBarrier();
        }
        super.doStop();
        stopConsumers();
        this.active = false;
        this.shardConsumerManagerFuture.cancel(true);
    }

    private void stopConsumers() {
        Iterator<ShardConsumer> it = this.shardConsumers.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.shardConsumers.clear();
    }

    private void setAttributesIfNecessary(Object obj, Message<?> message) {
        if (getErrorChannel() != null) {
            AttributeAccessor attributeAccessor = ErrorMessageUtils.getAttributeAccessor(message, (Message) null);
            attributesHolder.set(attributeAccessor);
            attributeAccessor.setAttribute(AwsHeaders.RAW_RECORD, obj);
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributeAccessor = attributesHolder.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }

    public String toString() {
        return "KinesisMessageDrivenChannelAdapter{shardOffsets=" + this.shardOffsets + ", consumerGroup='" + this.consumerGroup + "'}";
    }
}
