package com.agorapulse.micronaut.aws.kinesis;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.CreateStreamResult;
import com.amazonaws.services.kinesis.model.DeleteStreamResult;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.MergeShardsRequest;
import com.amazonaws.services.kinesis.model.MergeShardsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.SplitShardRequest;
import com.amazonaws.services.kinesis.model.SplitShardResult;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.core.util.StringUtils;
import io.reactivex.Emitter;
import io.reactivex.Flowable;
import io.reactivex.functions.BiFunction;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/agorapulse/micronaut/aws/kinesis/DefaultKinesisService.class */
public class DefaultKinesisService implements KinesisService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisService.class);
    private static final int DEFAULT_GET_RECORDS_WAIT = 1000;
    private static final int MAX_PUT_RECORDS_SIZE = 500;
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
    private final AmazonKinesis client;
    private final KinesisConfiguration configuration;
    private final ObjectMapper objectMapper;

    public DefaultKinesisService(AmazonKinesis amazonKinesis, KinesisConfiguration kinesisConfiguration, ObjectMapper objectMapper) {
        this.client = amazonKinesis;
        this.configuration = kinesisConfiguration;
        this.objectMapper = objectMapper;
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public String getDefaultStreamName() {
        if (StringUtils.isEmpty(this.configuration.getStream())) {
            throw new IllegalStateException("Default stream must be defined");
        }
        return this.configuration.getStream();
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public CreateStreamResult createStream(String str, int i) {
        try {
            return this.client.createStream(str, Integer.valueOf(i));
        } catch (ResourceInUseException e) {
            return new CreateStreamResult();
        }
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public String decodeRecordData(Record record) {
        String str = "";
        try {
            str = this.decoder.decode(record.getData()).toString();
        } catch (CharacterCodingException e) {
            LOGGER.error("Malformed data for record: " + record, e);
        }
        return str;
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public DeleteStreamResult deleteStream(String str) {
        return this.client.deleteStream(str);
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public DescribeStreamResult describeStream(String str) {
        DescribeStreamResult describeStream;
        DescribeStreamRequest withStreamName = new DescribeStreamRequest().withStreamName(str);
        String str2 = null;
        ArrayList arrayList = new ArrayList();
        while (true) {
            withStreamName.setExclusiveStartShardId(str2);
            describeStream = this.client.describeStream(withStreamName);
            arrayList.addAll(describeStream.getStreamDescription().getShards());
            if (arrayList.isEmpty() || !describeStream.getStreamDescription().getHasMoreShards().booleanValue()) {
                break;
            }
            str2 = ((Shard) arrayList.get(arrayList.size() - 1)).getShardId();
        }
        describeStream.getStreamDescription().setShards(arrayList);
        return describeStream;
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public Shard getShard(String str, String str2) {
        DescribeStreamResult describeStream = describeStream(str);
        if (describeStream.getStreamDescription().getShards().isEmpty()) {
            return null;
        }
        Optional findFirst = describeStream.getStreamDescription().getShards().stream().filter(shard -> {
            return shard.getShardId().equals(str2);
        }).findFirst();
        if (findFirst.isPresent()) {
            return (Shard) findFirst.get();
        }
        return null;
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public List<String> listStreamNames() {
        return this.client.listStreams().getStreamNames();
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public Flowable<Record> getShardRecords(String str, Shard shard, ShardIteratorType shardIteratorType, String str2, final int i) {
        return Flowable.generate(() -> {
            GetShardIteratorRequest withShardIteratorType = new GetShardIteratorRequest().withStreamName(str).withShardId(shard.getShardId()).withShardIteratorType(shardIteratorType.toString());
            if (shardIteratorType == ShardIteratorType.AFTER_SEQUENCE_NUMBER || shardIteratorType == ShardIteratorType.AT_SEQUENCE_NUMBER) {
                if (str2 == null || str2.length() == 0) {
                    throw new IllegalArgumentException("Starting sequence number must not be null!");
                }
                withShardIteratorType.withStartingSequenceNumber((String) Objects.requireNonNull(str2));
            }
            return this.client.getShardIterator(withShardIteratorType).getShardIterator();
        }, new BiFunction<String, Emitter<List<Record>>, String>() { // from class: com.agorapulse.micronaut.aws.kinesis.DefaultKinesisService.1
            public String apply(String str3, Emitter<List<Record>> emitter) throws Exception {
                GetRecordsRequest withShardIterator = new GetRecordsRequest().withShardIterator(str3);
                if (i > 0) {
                    withShardIterator.withLimit(Integer.valueOf(i));
                }
                GetRecordsResult records = DefaultKinesisService.this.client.getRecords(withShardIterator);
                List records2 = records.getRecords();
                if (records2.isEmpty()) {
                    Thread.sleep(1000L);
                    return str3;
                }
                emitter.onNext(records2);
                return records.getNextShardIterator();
            }
        }).flatMap((v0) -> {
            return Flowable.fromIterable(v0);
        });
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public PutRecordResult putEvent(String str, Event event, String str2) {
        if (!StringUtils.isEmpty(this.configuration.getConsumerFilterKey())) {
            event.setConsumerFilterKey(this.configuration.getConsumerFilterKey());
        }
        try {
            return putRecord(str, event.getPartitionKey(), this.objectMapper.writeValueAsString(event), str2);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException("Cannot write value as JSON: " + event, e);
        }
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public PutRecordsResult putEvents(String str, List<Event> list) {
        if (list.size() > MAX_PUT_RECORDS_SIZE) {
            throw new IllegalArgumentException("Max put events size is 500");
        }
        return putRecords(str, (List) list.stream().map(event -> {
            if (!StringUtils.isEmpty(this.configuration.getConsumerFilterKey())) {
                event.setConsumerFilterKey(this.configuration.getConsumerFilterKey());
            }
            try {
                return new PutRecordsRequestEntry().withData(ByteBuffer.wrap(this.objectMapper.writeValueAsString(event).getBytes())).withPartitionKey(event.getPartitionKey());
            } catch (JsonProcessingException e) {
                throw new IllegalArgumentException("Cannot write value as JSON: " + event, e);
            }
        }).collect(Collectors.toList()));
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public PutRecordResult putRecord(String str, String str2, byte[] bArr, String str3) {
        PutRecordRequest withStreamName = new PutRecordRequest().withData(ByteBuffer.wrap(bArr)).withPartitionKey(str2).withStreamName(str);
        if (!StringUtils.isEmpty(str3)) {
            withStreamName.withSequenceNumberForOrdering(str3);
        }
        return this.client.putRecord(withStreamName);
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public PutRecordsResult putRecords(String str, List<PutRecordsRequestEntry> list) {
        return this.client.putRecords(new PutRecordsRequest().withRecords(list).withStreamName(str));
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public MergeShardsResult mergeShards(String str, String str2, String str3) {
        return this.client.mergeShards(new MergeShardsRequest().withStreamName(str).withShardToMerge(str2).withAdjacentShardToMerge(str3));
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public SplitShardResult splitShard(String str, Shard shard, String str2) {
        if (StringUtils.isEmpty(str2)) {
            str2 = new BigInteger(shard.getHashKeyRange().getStartingHashKey()).add(new BigInteger(shard.getHashKeyRange().getEndingHashKey())).divide(BigInteger.valueOf(2L)).toString();
        }
        return this.client.splitShard(new SplitShardRequest().withStreamName(str).withShardToSplit(shard.getShardId()).withNewStartingHashKey(str2));
    }

    @Override // com.agorapulse.micronaut.aws.kinesis.KinesisService
    public void waitForStatus(String str, StreamStatus streamStatus) {
        while (!streamStatus.name().equals(describeStream(str).getStreamDescription().getStreamStatus())) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                throw new IllegalStateException("Waiting for stream to become active was interrupted!", e);
            }
        }
    }
}
