package org.bitbucket.sqs;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.bitbucket.sqs.SqsMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Single;

/* loaded from: input_file:org/bitbucket/sqs/DefaultSqsQueueReader.class */
class DefaultSqsQueueReader implements SqsQueueReader {
    private static final Logger log = LoggerFactory.getLogger(DefaultSqsQueueReader.class);
    private final AmazonSQS sqsClient;
    private final String queueUrl;
    private final Duration pollTimeout;
    private final List<String> messageAttributes;
    private final Optional<SqsQueueWriter> deadLetterQueueWriter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultSqsQueueReader(@Nonnull AmazonSQS amazonSQS, @Nonnull String str, @Nonnull Duration duration, @Nonnull List<String> list, @Nonnull Optional<SqsQueueWriter> optional) {
        this.sqsClient = (AmazonSQS) Objects.requireNonNull(amazonSQS, "sqsClient");
        this.queueUrl = (String) Objects.requireNonNull(str, "queueUrl");
        this.pollTimeout = (Duration) Objects.requireNonNull(duration, "pollTimeout");
        this.messageAttributes = ImmutableList.copyOf((Collection) Objects.requireNonNull(list, "messageAttributes"));
        this.deadLetterQueueWriter = (Optional) Objects.requireNonNull(optional, "deadLetterQueueWriter");
    }

    @Override // org.bitbucket.sqs.SqsQueueReader
    @Nonnull
    public Single<Optional<SqsMessage>> dequeueMessage() {
        return Single.fromCallable(() -> {
            return receiveMessage().map(this::convert);
        });
    }

    public String toString() {
        return "DefaultSqsQueueReader{queueUrl='" + this.queueUrl + "'}";
    }

    private Completable acknowledgeMessage(String str) {
        return Completable.fromAction(() -> {
        });
    }

    private Completable changeVisibilityTimeout(String str, Duration duration) {
        ChangeMessageVisibilityRequest withVisibilityTimeout = new ChangeMessageVisibilityRequest().withQueueUrl(this.queueUrl).withReceiptHandle(str).withVisibilityTimeout(Integer.valueOf(Math.toIntExact(duration.getSeconds())));
        return Completable.fromAction(() -> {
        });
    }

    private SqsMessage convert(Message message) {
        String receiptHandle = message.getReceiptHandle();
        String body = message.getBody();
        return DefaultSqsMessage.builder().id(message.getMessageId()).receiptHandle(receiptHandle).payload(body).messageAttributes(ImmutableMap.copyOf((Map) message.getMessageAttributes().entrySet().stream().filter(entry -> {
            return "String".equals(((MessageAttributeValue) entry.getValue()).getDataType());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return ((MessageAttributeValue) entry2.getValue()).getStringValue();
        })))).onChangeVisibilityTimeout(duration -> {
            return changeVisibilityTimeout(receiptHandle, duration);
        }).onAcknowledge(() -> {
            return acknowledgeMessage(receiptHandle);
        }).onDiscard(optional -> {
            return discardMessage(body, optional);
        }).build();
    }

    private Completable discardMessage(String str, Optional<String> optional) {
        Map map = (Map) optional.map(str2 -> {
            return ImmutableMap.of(SqsMessage.Attributes.DISCARD_REASON, str2);
        }).orElse(ImmutableMap.of());
        return (Completable) this.deadLetterQueueWriter.map(sqsQueueWriter -> {
            return sqsQueueWriter.enqueueMessage(str, (Map<String, String>) map);
        }).orElseGet(() -> {
            return Completable.error(new IllegalStateException(String.format("Dead letter queue not configured for queue '%s'", this.queueUrl)));
        });
    }

    private Optional<Message> receiveMessage() {
        ReceiveMessageRequest withWaitTimeSeconds = new ReceiveMessageRequest(this.queueUrl).withMessageAttributeNames(this.messageAttributes).withMaxNumberOfMessages(1).withWaitTimeSeconds(Integer.valueOf(Math.toIntExact(this.pollTimeout.getSeconds())));
        return (Optional) SqsSdkHelpers.wrapSdkExceptions(() -> {
            List messages = this.sqsClient.receiveMessage(withWaitTimeSeconds).getMessages();
            int size = messages.size();
            if (size == 0) {
                return Optional.empty();
            }
            if (size == 1) {
                return Optional.of(messages.get(0));
            }
            rejectImmediately(messages);
            throw new IllegalStateException(String.format("Received %s SQS messages but expected none or just one, aborting receive attempt", Integer.valueOf(size)));
        });
    }

    private void rejectImmediately(Collection<Message> collection) {
        collection.stream().peek(message -> {
            log.debug("Returning message '{}' with receipt handle '{}' back to the queue", message.getMessageId(), message.getReceiptHandle());
        }).map(message2 -> {
            return changeVisibilityTimeout(message2.getReceiptHandle(), Duration.ZERO);
        }).reduce((completable, completable2) -> {
            return Completable.mergeDelayError(new Completable[]{completable, completable2});
        }).ifPresent((v0) -> {
            v0.await();
        });
    }
}
