package org.springframework.integration.aws.outbound;

import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecord;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.schemaregistry.common.Schema;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.context.Lifecycle;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.SerializingConverter;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.mapping.OutboundMessageMapper;
import org.springframework.integration.support.MutableMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsResponse;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

/* loaded from: input_file:org/springframework/integration/aws/outbound/KplMessageHandler.class */
public class KplMessageHandler extends AbstractAwsMessageHandler<Void> implements Lifecycle {
    private final KinesisProducer kinesisProducer;
    private Expression streamExpression;
    private Expression partitionKeyExpression;
    private Expression explicitHashKeyExpression;
    private Expression sequenceNumberExpression;
    private Expression glueSchemaExpression;
    private OutboundMessageMapper<byte[]> embeddedHeadersMapper;
    private volatile boolean running;
    private volatile ScheduledFuture<?> flushFuture;
    private MessageConverter messageConverter = new ConvertingFromMessageConverter(new SerializingConverter());
    private Duration flushDuration = Duration.ofMillis(0);

    public KplMessageHandler(KinesisProducer kinesisProducer) {
        Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null.");
        this.kinesisProducer = kinesisProducer;
    }

    @Deprecated
    public void setConverter(Converter<Object, byte[]> converter) {
        setMessageConverter(new ConvertingFromMessageConverter(converter));
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull(messageConverter, "'messageConverter' must not be null.");
        this.messageConverter = messageConverter;
    }

    public void setStream(String str) {
        setStreamExpression(new LiteralExpression(str));
    }

    public void setStreamExpressionString(String str) {
        setStreamExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setStreamExpression(Expression expression) {
        this.streamExpression = expression;
    }

    public void setPartitionKey(String str) {
        setPartitionKeyExpression(new LiteralExpression(str));
    }

    public void setPartitionKeyExpressionString(String str) {
        setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setPartitionKeyExpression(Expression expression) {
        this.partitionKeyExpression = expression;
    }

    public void setExplicitHashKey(String str) {
        setExplicitHashKeyExpression(new LiteralExpression(str));
    }

    public void setExplicitHashKeyExpressionString(String str) {
        setExplicitHashKeyExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setExplicitHashKeyExpression(Expression expression) {
        this.explicitHashKeyExpression = expression;
    }

    public void setSequenceNumberExpressionString(String str) {
        setSequenceNumberExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setSequenceNumberExpression(Expression expression) {
        this.sequenceNumberExpression = expression;
    }

    public void setEmbeddedHeadersMapper(OutboundMessageMapper<byte[]> outboundMessageMapper) {
        this.embeddedHeadersMapper = outboundMessageMapper;
    }

    public void setFlushDuration(Duration duration) {
        Assert.notNull(duration, "'flushDuration' must not be null.");
        this.flushDuration = duration;
    }

    @Override // org.springframework.integration.aws.outbound.AbstractAwsMessageHandler
    public void setHeaderMapper(HeaderMapper<Void> headerMapper) {
        throw new UnsupportedOperationException("Kinesis doesn't support headers.\nConsider to use 'OutboundMessageMapper<byte[]>' for embedding headers into the record data.");
    }

    public void setGlueSchema(Schema schema) {
        setPartitionKeyExpression(new ValueExpression(schema));
    }

    public void setGlueSchemaExpressionString(String str) {
        setGlueSchemaExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setGlueSchemaExpression(Expression expression) {
        this.glueSchemaExpression = expression;
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        if (this.flushDuration.toMillis() > 0) {
            TaskScheduler taskScheduler = getTaskScheduler();
            KinesisProducer kinesisProducer = this.kinesisProducer;
            Objects.requireNonNull(kinesisProducer);
            this.flushFuture = taskScheduler.scheduleAtFixedRate(kinesisProducer::flush, this.flushDuration);
        }
        this.running = true;
    }

    public synchronized void stop() {
        if (this.running) {
            this.running = false;
            if (this.flushFuture != null) {
                this.flushFuture.cancel(true);
            }
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override // org.springframework.integration.aws.outbound.AbstractAwsMessageHandler
    protected AwsRequest messageToAwsRequest(Message<?> message) {
        Object payload = message.getPayload();
        return payload instanceof PutRecordsRequest ? (PutRecordsRequest) payload : payload instanceof PutRecordRequest ? (PutRecordRequest) payload : payload instanceof UserRecord ? buildPutRecordRequest(message) : buildPutRecordRequest(message);
    }

    @Override // org.springframework.integration.aws.outbound.AbstractAwsMessageHandler
    protected CompletableFuture<? extends AwsResponse> handleMessageToAws(Message<?> message, AwsRequest awsRequest) {
        try {
            if (awsRequest instanceof PutRecordsRequest) {
                CompletableFuture<PutRecordsResponse> handlePutRecordsRequest = handlePutRecordsRequest(message, (PutRecordsRequest) awsRequest);
                if (this.flushDuration.toMillis() <= 0) {
                    this.kinesisProducer.flush();
                }
                return handlePutRecordsRequest;
            }
            Object payload = message.getPayload();
            if (payload instanceof UserRecord) {
                CompletableFuture<PutRecordResponse> handleUserRecord = handleUserRecord((UserRecord) payload);
                if (this.flushDuration.toMillis() <= 0) {
                    this.kinesisProducer.flush();
                }
                return handleUserRecord;
            }
            PutRecordRequest putRecordRequest = (PutRecordRequest) awsRequest;
            UserRecord userRecord = new UserRecord();
            userRecord.setExplicitHashKey(putRecordRequest.explicitHashKey());
            userRecord.setData(putRecordRequest.data().asByteBuffer());
            userRecord.setPartitionKey(putRecordRequest.partitionKey());
            userRecord.setStreamName(putRecordRequest.streamName());
            setGlueSchemaIntoUserRecordIfAny(userRecord, message);
            CompletableFuture<PutRecordResponse> handleUserRecord2 = handleUserRecord(userRecord);
            if (this.flushDuration.toMillis() <= 0) {
                this.kinesisProducer.flush();
            }
            return handleUserRecord2;
        } catch (Throwable th) {
            if (this.flushDuration.toMillis() <= 0) {
                this.kinesisProducer.flush();
            }
            throw th;
        }
    }

    @Override // org.springframework.integration.aws.outbound.AbstractAwsMessageHandler
    protected Map<String, ?> additionalOnSuccessHeaders(AwsRequest awsRequest, AwsResponse awsResponse) {
        if (!(awsResponse instanceof PutRecordResponse)) {
            return null;
        }
        PutRecordResponse putRecordResponse = (PutRecordResponse) awsResponse;
        return Map.of(AwsHeaders.SHARD, putRecordResponse.shardId(), AwsHeaders.SEQUENCE_NUMBER, putRecordResponse.sequenceNumber());
    }

    private CompletableFuture<PutRecordsResponse> handlePutRecordsRequest(Message<?> message, PutRecordsRequest putRecordsRequest) {
        AtomicInteger atomicInteger = new AtomicInteger();
        return Flux.fromIterable(putRecordsRequest.records()).map(putRecordsRequestEntry -> {
            UserRecord userRecord = new UserRecord();
            userRecord.setExplicitHashKey(putRecordsRequestEntry.explicitHashKey());
            userRecord.setData(putRecordsRequestEntry.data().asByteBuffer());
            userRecord.setPartitionKey(putRecordsRequestEntry.partitionKey());
            userRecord.setStreamName(putRecordsRequest.streamName());
            setGlueSchemaIntoUserRecordIfAny(userRecord, message);
            return userRecord;
        }).concatMap(userRecord -> {
            return Mono.fromFuture(handleUserRecord(userRecord)).map(putRecordResponse -> {
                return (PutRecordsResultEntry) PutRecordsResultEntry.builder().sequenceNumber(putRecordResponse.sequenceNumber()).shardId(putRecordResponse.shardId()).build();
            }).onErrorResume(UserRecordFailedException.class, userRecordFailedException -> {
                return Mono.just(userRecordFailedException.getResult()).map(userRecordResult -> {
                    PutRecordsResultEntry.Builder shardId = PutRecordsResultEntry.builder().sequenceNumber(userRecordResult.getSequenceNumber()).shardId(userRecordResult.getShardId());
                    atomicInteger.incrementAndGet();
                    userRecordResult.getAttempts().stream().reduce((attempt, attempt2) -> {
                        return attempt2;
                    }).ifPresent(attempt3 -> {
                        shardId.errorMessage(attempt3.getErrorMessage()).errorCode(attempt3.getErrorCode());
                    });
                    return (PutRecordsResultEntry) shardId.build();
                });
            });
        }).collectList().map(list -> {
            return (PutRecordsResponse) PutRecordsResponse.builder().records(list).failedRecordCount(Integer.valueOf(atomicInteger.get())).build();
        }).toFuture();
    }

    private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?> message) {
        if (this.glueSchemaExpression != null) {
            userRecord.setSchema((Schema) this.glueSchemaExpression.getValue(getEvaluationContext(), message, Schema.class));
        }
    }

    private CompletableFuture<PutRecordResponse> handleUserRecord(UserRecord userRecord) {
        return listenableFutureToCompletableFuture(this.kinesisProducer.addUserRecord(userRecord)).thenApply(userRecordResult -> {
            return (PutRecordResponse) PutRecordResponse.builder().shardId(userRecordResult.getShardId()).sequenceNumber(userRecordResult.getSequenceNumber()).build();
        });
    }

    private PutRecordRequest buildPutRecordRequest(Message<?> message) {
        String str;
        String str2;
        String str3;
        Object payload = message.getPayload();
        ByteBuffer byteBuffer = null;
        String str4 = null;
        if (payload instanceof UserRecord) {
            UserRecord userRecord = (UserRecord) payload;
            byteBuffer = userRecord.getData();
            str = userRecord.getStreamName();
            str2 = userRecord.getPartitionKey();
            str3 = userRecord.getExplicitHashKey();
        } else {
            MessageHeaders headers = message.getHeaders();
            str = (String) headers.get(AwsHeaders.STREAM, String.class);
            if (!StringUtils.hasText(str) && this.streamExpression != null) {
                str = (String) this.streamExpression.getValue(getEvaluationContext(), message, String.class);
            }
            Assert.state(str != null, "'stream' must not be null for sending a Kinesis record. Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an 'aws_stream' message header.");
            str2 = (String) headers.get(AwsHeaders.PARTITION_KEY, String.class);
            if (!StringUtils.hasText(str2) && this.partitionKeyExpression != null) {
                str2 = (String) this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class);
            }
            Assert.state(str2 != null, "'partitionKey' must not be null for sending a Kinesis record. Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') or supply an 'aws_partitionKey' message header.");
            str3 = this.explicitHashKeyExpression != null ? (String) this.explicitHashKeyExpression.getValue(getEvaluationContext(), message, String.class) : null;
            str4 = (String) headers.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
            if (!StringUtils.hasText(str4) && this.sequenceNumberExpression != null) {
                str4 = (String) this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
            }
            MutableMessage mutableMessage = null;
            if (payload instanceof ByteBuffer) {
                byteBuffer = (ByteBuffer) payload;
                if (this.embeddedHeadersMapper != null) {
                    mutableMessage = new MutableMessage(byteBuffer.array(), headers);
                }
            } else {
                byte[] bArr = (byte[]) (payload instanceof byte[] ? payload : this.messageConverter.fromMessage(message, byte[].class));
                Assert.notNull(bArr, "payload cannot be null");
                if (this.embeddedHeadersMapper != null) {
                    mutableMessage = new MutableMessage(bArr, headers);
                } else {
                    byteBuffer = ByteBuffer.wrap(bArr);
                }
            }
            if (mutableMessage != null) {
                try {
                    byte[] bArr2 = (byte[]) this.embeddedHeadersMapper.fromMessage(mutableMessage);
                    Assert.notNull(bArr2, "payload cannot be null");
                    byteBuffer = ByteBuffer.wrap(bArr2);
                } catch (Exception e) {
                    throw new MessageConversionException(message, "Cannot embedded headers to payload", e);
                }
            }
        }
        return (PutRecordRequest) PutRecordRequest.builder().streamName(str).partitionKey(str2).explicitHashKey(str3).sequenceNumberForOrdering(str4).data(SdkBytes.fromByteBuffer(byteBuffer)).build();
    }

    private static <T> CompletableFuture<T> listenableFutureToCompletableFuture(final ListenableFuture<T> listenableFuture) {
        final CompletableFuture<T> completableFuture = new CompletableFuture<T>() { // from class: org.springframework.integration.aws.outbound.KplMessageHandler.1
            @Override // java.util.concurrent.CompletableFuture, java.util.concurrent.Future
            public boolean cancel(boolean z) {
                boolean cancel = listenableFuture.cancel(z);
                super.cancel(z);
                return cancel;
            }
        };
        Futures.addCallback(listenableFuture, new FutureCallback<T>() { // from class: org.springframework.integration.aws.outbound.KplMessageHandler.2
            public void onSuccess(T t) {
                completableFuture.complete(t);
            }

            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        }, MoreExecutors.directExecutor());
        return completableFuture;
    }
}
