package org.bitbucket.sqs;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.bitbucket.sqs.SqsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;

/* loaded from: input_file:org/bitbucket/sqs/SqsListeningTask.class */
class SqsListeningTask implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(SqsListeningTask.class);
    private final SqsQueueReader queueReader;
    private final SqsInstructingMessageHandler messageHandler;
    private final Executor resubmitExecutor;
    private final MessageProcessor messageProcessor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SqsListeningTask(@Nonnull SqsConfiguration.SqsQueueConfiguration sqsQueueConfiguration, @Nonnull Executor executor, @Nonnull MessageProcessor messageProcessor) {
        Objects.requireNonNull(sqsQueueConfiguration, "queueConfiguration");
        this.queueReader = sqsQueueConfiguration.getQueue().getReader();
        this.messageHandler = sqsQueueConfiguration.getMessageHandler();
        this.resubmitExecutor = (Executor) Objects.requireNonNull(executor, "resubmitExecutor");
        this.messageProcessor = (MessageProcessor) Objects.requireNonNull(messageProcessor, "messageProcessor");
    }

    @Override // java.lang.Runnable
    public void run() {
        this.queueReader.dequeueMessage().onErrorReturn(th -> {
            log.error("Exception on listening to the SQS queue", th);
            return Optional.empty();
        }).doOnEach(notification -> {
            this.resubmitExecutor.execute(this);
        }).onErrorReturn(th2 -> {
            return Optional.empty();
        }).flatMapCompletable(optional -> {
            return (Completable) optional.map(sqsMessage -> {
                return this.messageProcessor.processMessage(sqsMessage, this.messageHandler);
            }).orElse(Completable.complete());
        }).doOnError(th3 -> {
            log.error("Exception on processing the message", th3);
        }).onErrorComplete().await();
    }
}
