package com.atlassian.business.insights.bitbucket.extract.commit.streaming;

import com.atlassian.bitbucket.commit.Commit;
import com.atlassian.bitbucket.commit.CommitCallback;
import com.atlassian.bitbucket.commit.SimpleCommit;
import com.atlassian.bitbucket.repository.Repository;
import com.atlassian.bitbucket.repository.RepositoryService;
import com.atlassian.bitbucket.user.SimplePerson;
import com.atlassian.business.insights.api.Entity;
import com.atlassian.business.insights.api.extract.EntityStreamerQuery;
import com.atlassian.business.insights.api.user.RequestContext;
import com.atlassian.business.insights.bitbucket.config.BitbucketPropertiesProvider;
import com.atlassian.business.insights.bitbucket.extract.commit.CommitDetails;
import com.atlassian.business.insights.bitbucket.extract.commit.CommitToEntityTransformer;
import com.atlassian.business.insights.bitbucket.extract.filter.BitbucketProjectOptOutFilter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/atlassian/business/insights/bitbucket/extract/commit/streaming/CommitIterator.class */
public class CommitIterator implements Iterator<Entity<String, CommitDetails>> {
    private static final int MAX_COMMIT_QUEUE_SIZE = 2000;
    private static final Entity<String, CommitDetails> POISON_PILL = Entity.getInstance("POISON_PILL", Instant.EPOCH, new CommitDetails.Builder(new SimpleCommit.Builder("poison.pill.commit.id").author(new SimplePerson("Mr. Poison Pill", "ppill@atlassian.com")).authorTimestamp(new Date()).build()).build());
    private static final Logger log = LoggerFactory.getLogger(CommitIterator.class);
    private final BitbucketCommitStreamer bitbucketCommitStreamer;
    private final CommitToEntityTransformer commitToEntityTransformer;
    private final Instant commitsSince;
    private final long queuePollingTimeoutSeconds;
    private final RequestContext requestContext;
    private final SingleGitRepositoryIterator singleGitRepositoryIterator;
    private final boolean includeBuildStatuses;
    private final AtomicReference<Throwable> childThreadThrowable = new AtomicReference<>();
    private final BlockingQueue<Entity<String, CommitDetails>> commitsQueue = new LinkedBlockingQueue(2000);
    private final AtomicBoolean continueCommitStreaming = new AtomicBoolean(true);
    private final BiCommitCallback biCommitCallback = new BiCommitCallback();

    /* loaded from: input_file:com/atlassian/business/insights/bitbucket/extract/commit/streaming/CommitIterator$BiCommitCallback.class */
    private class BiCommitCallback implements CommitCallback {
        private BiCommitCallback() {
        }

        public boolean onCommit(@Nonnull Commit commit) {
            try {
                CommitIterator.this.commitsQueue.put(CommitIterator.this.commitToEntityTransformer.toEntity(commit, CommitIterator.this.includeBuildStatuses));
                CommitIterator.log.trace("Added commitsDetailsEntity to the queue with commit {}. Queue size: {}", commit.getId(), Integer.valueOf(CommitIterator.this.commitsQueue.size()));
                return CommitIterator.this.continueCommitStreaming.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CommitStreamingException("Error when adding commit details to queue", e);
            }
        }
    }

    /* loaded from: input_file:com/atlassian/business/insights/bitbucket/extract/commit/streaming/CommitIterator$ThrowablePropagatingExecutor.class */
    private class ThrowablePropagatingExecutor extends ThreadPoolExecutor {
        public ThrowablePropagatingExecutor() {
            super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("data-pipeline-commit-streamer-%d").build());
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void afterExecute(Runnable runnable, Throwable th) {
            super.afterExecute(runnable, th);
            Throwable th2 = th;
            if (th2 == null && (runnable instanceof Future)) {
                try {
                    Future future = (Future) runnable;
                    if (future.isDone()) {
                        future.get();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    th2 = e;
                } catch (CancellationException | ExecutionException e2) {
                    th2 = e2;
                }
            }
            CommitIterator.this.childThreadThrowable.set(th2);
            if (CommitIterator.this.childThreadThrowable.get() != null) {
                CommitIterator.this.continueCommitStreaming.set(false);
            }
        }
    }

    public CommitIterator(BitbucketCommitStreamer bitbucketCommitStreamer, CommitToEntityTransformer commitToEntityTransformer, EntityStreamerQuery entityStreamerQuery, RepositoryService repositoryService, RequestContext requestContext, BitbucketPropertiesProvider bitbucketPropertiesProvider) {
        this.bitbucketCommitStreamer = bitbucketCommitStreamer;
        this.commitsSince = entityStreamerQuery.getFrom();
        this.requestContext = requestContext;
        this.commitToEntityTransformer = commitToEntityTransformer;
        this.singleGitRepositoryIterator = new SingleGitRepositoryIterator(repositoryService, bitbucketPropertiesProvider, new BitbucketProjectOptOutFilter(entityStreamerQuery.getOptOutEntityIdentifiers()));
        this.queuePollingTimeoutSeconds = bitbucketPropertiesProvider.getQueuePollingTimeout();
        this.includeBuildStatuses = bitbucketPropertiesProvider.isBuildStatusesExportEnabled();
        new ThrowablePropagatingExecutor().submit(this::produceCommits);
    }

    public void close() {
        rethrowExceptionInChildThreadIfExists();
        this.commitsQueue.clear();
        this.continueCommitStreaming.set(false);
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        rethrowExceptionInChildThreadIfExists();
        if (this.commitsQueue.peek() != POISON_PILL) {
            return this.continueCommitStreaming.get() || !this.commitsQueue.isEmpty();
        }
        this.continueCommitStreaming.set(false);
        return false;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public Entity<String, CommitDetails> next() {
        rethrowExceptionInChildThreadIfExists();
        try {
            Entity<String, CommitDetails> poll = this.commitsQueue.poll(this.queuePollingTimeoutSeconds, TimeUnit.SECONDS);
            if (poll == null) {
                rethrowExceptionInChildThreadIfExists();
                this.continueCommitStreaming.set(false);
                throw new CommitStreamingException(String.format("Timeout after %d seconds when waiting for the commit to be polled off queue", Long.valueOf(this.queuePollingTimeoutSeconds)));
            }
            if (poll == POISON_PILL) {
                log.debug("Retrieved poison pill from queue");
                this.continueCommitStreaming.set(false);
                poll = null;
            }
            return poll;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CommitStreamingException("Interrupted while polling commits from queue", e);
        }
    }

    private void produceCommits() {
        log.debug("Starting commit entity producing");
        this.requestContext.runInCustomContext(() -> {
            while (this.continueCommitStreaming.get() && this.singleGitRepositoryIterator.hasNext()) {
                Repository repository = (Repository) this.singleGitRepositoryIterator.next();
                if (repository != null) {
                    log.debug("Streaming commits for repository [{}]", Integer.valueOf(repository.getId()));
                    this.bitbucketCommitStreamer.streamCommits(this.biCommitCallback, repository, this.commitsSince);
                }
            }
            try {
                this.commitsQueue.put(POISON_PILL);
                log.debug("Added poison pill to queue");
                log.debug("Finished streaming commits for all repositories");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CommitStreamingException("Error when adding poison pill to queue", e);
            }
        });
    }

    private void rethrowExceptionInChildThreadIfExists() {
        if (this.childThreadThrowable.get() != null) {
            if (this.childThreadThrowable.get() instanceof CommitStreamingException) {
                throw ((CommitStreamingException) this.childThreadThrowable.get());
            }
            if (!(this.childThreadThrowable.get() instanceof ExecutionException) || !(this.childThreadThrowable.get().getCause() instanceof CommitStreamingException)) {
                throw new CommitStreamingException("Error in child thread during commit streaming : " + this.childThreadThrowable.get().getMessage(), this.childThreadThrowable.get());
            }
            throw ((CommitStreamingException) this.childThreadThrowable.get().getCause());
        }
    }
}
