package org.springframework.integration.aws.inbound.kinesis;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.log.LogMessage;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.aws.event.KinesisShardEndedEvent;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.class */
public class KclMessageDrivenChannelAdapter extends MessageProducerSupport implements ApplicationEventPublisherAware {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();
    private final ShardRecordProcessorFactory recordProcessorFactory;
    private final String[] streams;
    private final KinesisAsyncClient kinesisClient;
    private final CloudWatchAsyncClient cloudWatchClient;
    private final DynamoDbAsyncClient dynamoDBClient;
    private TaskExecutor executor;
    private String consumerGroup;
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private ConfigsBuilder config;
    private InitialPositionInStreamExtended streamInitialSequence;
    private int consumerBackoff;
    private Converter<byte[], Object> converter;
    private ListenerMode listenerMode;
    private long checkpointsInterval;
    private CheckpointMode checkpointMode;
    private String workerId;
    private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer;
    private boolean bindSourceRecord;
    private boolean fanOut;
    private ApplicationEventPublisher applicationEventPublisher;
    private volatile Scheduler scheduler;

    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter$RecordProcessor.class */
    private final class RecordProcessor implements ShardRecordProcessor {
        private final String stream;
        private String shardId;
        private long nextCheckpointTimeInMillis;

        RecordProcessor(String str) {
            this.stream = str;
        }

        public void initialize(InitializationInput initializationInput) {
            this.shardId = initializationInput.shardId();
            KclMessageDrivenChannelAdapter.this.logger.info(() -> {
                return "Initializing record processor for shard: " + this.shardId;
            });
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            KclMessageDrivenChannelAdapter.this.logger.info(LogMessage.format("Shard [%s] ended; checkpointing...", this.shardId));
            try {
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                KclMessageDrivenChannelAdapter.this.logger.error(e, "Exception while checkpointing at requested shutdown. Giving up");
            }
            if (KclMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
                KclMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent(new KinesisShardEndedEvent(KclMessageDrivenChannelAdapter.this, this.shardId));
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            KclMessageDrivenChannelAdapter.this.logger.info("Scheduler is shutting down; checkpointing...");
            try {
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                KclMessageDrivenChannelAdapter.this.logger.error(e, "Exception while checkpointing at requested shutdown. Giving up");
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            List<KinesisClientRecord> records = processRecordsInput.records();
            RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
            KclMessageDrivenChannelAdapter.this.logger.debug(() -> {
                return "Processing " + records.size() + " records from " + this.shardId;
            });
            try {
                if (ListenerMode.record.equals(KclMessageDrivenChannelAdapter.this.listenerMode)) {
                    for (KinesisClientRecord kinesisClientRecord : records) {
                        processSingleRecord(kinesisClientRecord, checkpointer);
                        checkpointIfRecordMode(checkpointer, kinesisClientRecord);
                        checkpointIfPeriodicMode(checkpointer, kinesisClientRecord);
                    }
                } else if (ListenerMode.batch.equals(KclMessageDrivenChannelAdapter.this.listenerMode)) {
                    processMultipleRecords(records, checkpointer);
                    checkpointIfPeriodicMode(checkpointer, null);
                }
                checkpointIfBatchMode(checkpointer);
                KclMessageDrivenChannelAdapter.attributesHolder.remove();
            } catch (Throwable th) {
                KclMessageDrivenChannelAdapter.attributesHolder.remove();
                throw th;
            }
        }

        private void processSingleRecord(KinesisClientRecord kinesisClientRecord, RecordProcessorCheckpointer recordProcessorCheckpointer) {
            performSend(prepareMessageForRecord(kinesisClientRecord), kinesisClientRecord, recordProcessorCheckpointer);
        }

        private void processMultipleRecords(List<KinesisClientRecord> list, RecordProcessorCheckpointer recordProcessorCheckpointer) {
            AbstractIntegrationMessageBuilder<?> withPayload = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(list);
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                withPayload = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload((List) list.stream().map(this::prepareMessageForRecord).map((v0) -> {
                    return v0.build();
                }).collect(Collectors.toList()));
            } else if (KclMessageDrivenChannelAdapter.this.converter != null) {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                withPayload = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload((List) list.stream().map(kinesisClientRecord -> {
                    arrayList.add(kinesisClientRecord.partitionKey());
                    arrayList2.add(kinesisClientRecord.sequenceNumber());
                    return KclMessageDrivenChannelAdapter.this.converter.convert(kinesisClientRecord.data().array());
                }).collect(Collectors.toList())).setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, arrayList).setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, arrayList2);
            }
            performSend(withPayload, list, recordProcessorCheckpointer);
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(KinesisClientRecord kinesisClientRecord) {
            Object copyAllBytesFrom = BinaryUtils.copyAllBytesFrom(kinesisClientRecord.data());
            Message message = null;
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    message = KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage((byte[]) copyAllBytesFrom);
                    if (message == null) {
                        throw new IllegalStateException("The 'embeddedHeadersMapper' returned null for payload: " + Arrays.toString((byte[]) copyAllBytesFrom));
                    }
                    copyAllBytesFrom = message.getPayload();
                } catch (Exception e) {
                    KclMessageDrivenChannelAdapter.this.logger.warn(e, "Could not parse embedded headers. Remain payload untouched.");
                }
            }
            if ((copyAllBytesFrom instanceof byte[]) && KclMessageDrivenChannelAdapter.this.converter != null) {
                copyAllBytesFrom = KclMessageDrivenChannelAdapter.this.converter.convert((byte[]) copyAllBytesFrom);
            }
            AbstractIntegrationMessageBuilder<Object> header = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(copyAllBytesFrom).setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, kinesisClientRecord.partitionKey()).setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, kinesisClientRecord.sequenceNumber());
            if (KclMessageDrivenChannelAdapter.this.bindSourceRecord) {
                header.setHeader("sourceData", kinesisClientRecord);
            }
            if (message != null) {
                header.copyHeadersIfAbsent(message.getHeaders());
            }
            return header;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> abstractIntegrationMessageBuilder, Object obj, RecordProcessorCheckpointer recordProcessorCheckpointer) {
            abstractIntegrationMessageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, this.stream).setHeader(AwsHeaders.SHARD, this.shardId);
            if (CheckpointMode.manual.equals(KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                abstractIntegrationMessageBuilder.setHeader(AwsHeaders.CHECKPOINTER, recordProcessorCheckpointer);
            }
            Message<?> build = abstractIntegrationMessageBuilder.build();
            setAttributesIfNecessary(obj, build);
            try {
                KclMessageDrivenChannelAdapter.this.sendMessage(build);
            } catch (Exception e) {
                KclMessageDrivenChannelAdapter.this.logger.error(e, () -> {
                    return "Got an exception during sending a '" + build + "'\nfor the '" + obj + "'.\nConsider to use 'errorChannel' flow for the compensation logic.";
                });
            }
        }

        private void setAttributesIfNecessary(Object obj, Message<?> message) {
            if (KclMessageDrivenChannelAdapter.this.getErrorChannel() != null) {
                AttributeAccessor attributeAccessor = ErrorMessageUtils.getAttributeAccessor(message, (Message) null);
                KclMessageDrivenChannelAdapter.attributesHolder.set(attributeAccessor);
                attributeAccessor.setAttribute(AwsHeaders.RAW_RECORD, obj);
            }
        }

        private void checkpoint(RecordProcessorCheckpointer recordProcessorCheckpointer, @Nullable KinesisClientRecord kinesisClientRecord) {
            KclMessageDrivenChannelAdapter.this.logger.info(() -> {
                return "Checkpointing shard " + this.shardId;
            });
            try {
                if (kinesisClientRecord == null) {
                    recordProcessorCheckpointer.checkpoint();
                } else {
                    recordProcessorCheckpointer.checkpoint(kinesisClientRecord.sequenceNumber());
                }
            } catch (ThrottlingException e) {
                KclMessageDrivenChannelAdapter.this.logger.info(e, "Transient issue when checkpointing");
            } catch (ShutdownException e2) {
                KclMessageDrivenChannelAdapter.this.logger.info(e2, "Caught shutdown exception, skipping checkpoint.");
            } catch (InvalidStateException e3) {
                KclMessageDrivenChannelAdapter.this.logger.error(e3, "Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client.");
            }
        }

        private void checkpointIfBatchMode(RecordProcessorCheckpointer recordProcessorCheckpointer) {
            if (CheckpointMode.batch.equals(KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                checkpoint(recordProcessorCheckpointer, null);
            }
        }

        private void checkpointIfRecordMode(RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisClientRecord kinesisClientRecord) {
            if (CheckpointMode.record.equals(KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                checkpoint(recordProcessorCheckpointer, kinesisClientRecord);
            }
        }

        private void checkpointIfPeriodicMode(RecordProcessorCheckpointer recordProcessorCheckpointer, @Nullable KinesisClientRecord kinesisClientRecord) {
            if (!CheckpointMode.periodic.equals(KclMessageDrivenChannelAdapter.this.checkpointMode) || System.currentTimeMillis() <= this.nextCheckpointTimeInMillis) {
                return;
            }
            checkpoint(recordProcessorCheckpointer, kinesisClientRecord);
            this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KclMessageDrivenChannelAdapter.this.checkpointsInterval;
        }
    }

    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter$RecordProcessorFactory.class */
    private final class RecordProcessorFactory implements ShardRecordProcessorFactory {
        RecordProcessorFactory() {
        }

        public ShardRecordProcessor shardRecordProcessor() {
            throw new UnsupportedOperationException();
        }

        public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
            return new RecordProcessor(streamIdentifier.streamName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter$StreamsTracker.class */
    public final class StreamsTracker implements MultiStreamTracker {
        private final FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy = new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() { // from class: org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter.StreamsTracker.1
            public Duration waitPeriodToDeleteFormerStreams() {
                return Duration.ZERO;
            }
        };
        private final Flux<StreamConfig> streamConfigs;

        StreamsTracker() {
            this.streamConfigs = Flux.fromArray(KclMessageDrivenChannelAdapter.this.streams).flatMap(str -> {
                return Mono.fromFuture(KclMessageDrivenChannelAdapter.this.kinesisClient.describeStreamSummary(builder -> {
                    builder.streamName(str);
                }));
            }).map((v0) -> {
                return v0.streamDescriptionSummary();
            }).map(streamDescriptionSummary -> {
                return StreamIdentifier.multiStreamInstance(Arn.fromString(streamDescriptionSummary.streamARN()), streamDescriptionSummary.streamCreationTimestamp().getEpochSecond());
            }).map(streamIdentifier -> {
                return new StreamConfig(streamIdentifier, KclMessageDrivenChannelAdapter.this.streamInitialSequence);
            }).cache();
        }

        public List<StreamConfig> streamConfigList() {
            return (List) this.streamConfigs.collectList().block();
        }

        public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
            return this.formerStreamsLeasesDeletionStrategy;
        }
    }

    public KclMessageDrivenChannelAdapter(String... strArr) {
        this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), strArr);
    }

    public KclMessageDrivenChannelAdapter(Region region, String... strArr) {
        this((KinesisAsyncClient) KinesisAsyncClient.builder().region(region).build(), (CloudWatchAsyncClient) CloudWatchAsyncClient.builder().region(region).build(), (DynamoDbAsyncClient) DynamoDbAsyncClient.builder().region(region).build(), strArr);
    }

    public KclMessageDrivenChannelAdapter(KinesisAsyncClient kinesisAsyncClient, CloudWatchAsyncClient cloudWatchAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, String... strArr) {
        this.recordProcessorFactory = new RecordProcessorFactory();
        this.executor = new SimpleAsyncTaskExecutor();
        this.consumerGroup = "SpringIntegration";
        this.streamInitialSequence = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
        this.consumerBackoff = 1000;
        this.converter = new DeserializingConverter();
        this.listenerMode = ListenerMode.record;
        this.checkpointsInterval = 5000L;
        this.checkpointMode = CheckpointMode.batch;
        this.workerId = UUID.randomUUID().toString();
        this.fanOut = true;
        Assert.notNull(kinesisAsyncClient, "'kinesisClient' must not be null.");
        Assert.notNull(cloudWatchAsyncClient, "'cloudWatchClient' must not be null.");
        Assert.notNull(dynamoDbAsyncClient, "'dynamoDBClient' must not be null.");
        Assert.notEmpty(strArr, "'streams' must not be empty.");
        this.streams = strArr;
        this.kinesisClient = kinesisAsyncClient;
        this.cloudWatchClient = cloudWatchAsyncClient;
        this.dynamoDBClient = dynamoDbAsyncClient;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setExecutor(TaskExecutor taskExecutor) {
        Assert.notNull(taskExecutor, "'executor' must not be null.");
        this.executor = taskExecutor;
    }

    public void setConsumerGroup(String str) {
        Assert.hasText(str, "'consumerGroup' must not be empty");
        this.consumerGroup = str;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> inboundMessageMapper) {
        this.embeddedHeadersMapper = inboundMessageMapper;
    }

    public void setStreamInitialSequence(InitialPositionInStreamExtended initialPositionInStreamExtended) {
        Assert.notNull(initialPositionInStreamExtended, "'streamInitialSequence' must not be null");
        this.streamInitialSequence = initialPositionInStreamExtended;
    }

    public void setConsumerBackoff(int i) {
        this.consumerBackoff = Math.max(1000, i);
    }

    public void setConverter(Converter<byte[], Object> converter) {
        this.converter = converter;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        Assert.notNull(listenerMode, "'listenerMode' must not be null");
        this.listenerMode = listenerMode;
    }

    public void setCheckpointsInterval(long j) {
        this.checkpointsInterval = j;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull(checkpointMode, "'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setWorkerId(String str) {
        Assert.hasText(str, "'workerId' must not be null or empty");
        this.workerId = str;
    }

    public void setGlueSchemaRegistryDeserializer(GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer) {
        this.glueSchemaRegistryDeserializer = glueSchemaRegistryDeserializer;
    }

    public void setBindSourceRecord(boolean z) {
        this.bindSourceRecord = z;
    }

    public void setFanOut(boolean z) {
        this.fanOut = z;
    }

    protected void onInit() {
        super.onInit();
        this.config = new ConfigsBuilder(buildStreamTracker(), this.consumerGroup, this.kinesisClient, this.dynamoDBClient, this.cloudWatchClient, this.workerId, this.recordProcessorFactory);
    }

    private StreamTracker buildStreamTracker() {
        return this.streams.length == 1 ? new SingleStreamTracker(StreamIdentifier.singleStreamInstance(this.streams[0]), this.streamInitialSequence) : new StreamsTracker();
    }

    protected void doStart() {
        super.doStart();
        if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) {
            this.checkpointMode = CheckpointMode.batch;
            this.logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] because it does not make sense in case of [ListenerMode.batch].");
        }
        LifecycleConfig lifecycleConfig = this.config.lifecycleConfig();
        lifecycleConfig.taskBackoffTimeMillis(this.consumerBackoff);
        String str = this.streams.length == 1 ? this.streams[0] : null;
        this.scheduler = new Scheduler(this.config.checkpointConfig(), this.config.coordinatorConfig(), this.config.leaseManagementConfig(), lifecycleConfig, this.config.metricsConfig(), this.config.processorConfig(), this.config.retrievalConfig().glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer).retrievalSpecificConfig(this.fanOut ? new FanOutConfig(this.kinesisClient).applicationName(this.consumerGroup).streamName(str) : new PollingConfig(this.kinesisClient).streamName(str)));
        this.executor.execute(this.scheduler);
    }

    protected void doStop() {
        super.doStop();
        this.scheduler.shutdown();
    }

    public void destroy() {
        super.destroy();
        if (isRunning()) {
            this.scheduler.shutdown();
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributeAccessor = attributesHolder.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }

    public String toString() {
        return "KclMessageDrivenChannelAdapter{consumerGroup='" + this.consumerGroup + "', stream(s)='" + Arrays.toString(this.streams) + "'}";
    }
}
