package com.ribose.jenkins.plugin.awscodecommittrigger.threading;

import com.amazonaws.services.sqs.model.Message;
import com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueue;
import com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueListener;
import com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueMonitor;
import com.ribose.jenkins.plugin.awscodecommittrigger.logging.Log;
import com.ribose.jenkins.plugin.awscodecommittrigger.net.SQSChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/ribose/jenkins/plugin/awscodecommittrigger/threading/SQSQueueMonitorImpl.class */
public class SQSQueueMonitorImpl implements SQSQueueMonitor {
    private static final Log log;
    private final ExecutorService executor;
    private final SQSQueue queue;
    private final SQSChannel channel;
    private final Object listenersLock;
    private final List<SQSQueueListener> listeners;
    private final AtomicBoolean isRunning;
    private volatile boolean isShutDown;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SQSQueueMonitorImpl(ExecutorService executorService, SQSQueue sQSQueue, SQSChannel sQSChannel) {
        this.listenersLock = new Object();
        this.isRunning = new AtomicBoolean();
        this.executor = executorService;
        this.queue = sQSQueue;
        this.channel = sQSChannel;
        this.listeners = new ArrayList();
    }

    private SQSQueueMonitorImpl(ExecutorService executorService, SQSQueue sQSQueue, SQSChannel sQSChannel, List<SQSQueueListener> list) {
        this.listenersLock = new Object();
        this.isRunning = new AtomicBoolean();
        this.executor = executorService;
        this.queue = sQSQueue;
        this.channel = sQSChannel;
        this.listeners = list;
    }

    @Override // com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueMonitor
    public SQSQueueMonitor clone(SQSQueue sQSQueue, SQSChannel sQSChannel) {
        SQSQueueMonitorImpl sQSQueueMonitorImpl;
        synchronized (this.listenersLock) {
            sQSQueueMonitorImpl = new SQSQueueMonitorImpl(this.executor, sQSQueue, sQSChannel, this.listeners);
        }
        return sQSQueueMonitorImpl;
    }

    @Override // com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueMonitor
    public boolean add(SQSQueueListener sQSQueueListener) {
        if (!$assertionsDisabled && !sQSQueueListener.getQueueUuid().equals(this.channel.getQueueUuid())) {
            throw new AssertionError();
        }
        synchronized (this.listenersLock) {
            if (!this.listeners.add(sQSQueueListener) || this.listeners.size() != 1) {
                return false;
            }
            this.isShutDown = false;
            execute();
            return true;
        }
    }

    @Override // com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueMonitor
    public boolean remove(SQSQueueListener sQSQueueListener) {
        if (sQSQueueListener == null) {
            return false;
        }
        synchronized (this.listenersLock) {
            if (!this.listeners.remove(sQSQueueListener) || !this.listeners.isEmpty()) {
                return false;
            }
            shutDown();
            return true;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                if (this.isShutDown) {
                    if (!this.isRunning.compareAndSet(true, false)) {
                        log.warning("Monitor for %s already stopped", this.queue);
                    }
                    execute();
                } else {
                    if (!this.isRunning.compareAndSet(false, true)) {
                        log.warning("Monitor for %s already started", this.queue);
                        if (!this.isRunning.compareAndSet(true, false)) {
                            log.warning("Monitor for %s already stopped", this.queue);
                        }
                        execute();
                        return;
                    }
                    log.debug("Start monitor for %s", this.queue);
                    processMessages();
                    if (!this.isRunning.compareAndSet(true, false)) {
                        log.warning("Monitor for %s already stopped", this.queue);
                    }
                    execute();
                }
            } catch (Exception e) {
                log.warning("Monitor for %s stopped, error: %s", this.queue, e);
                this.isShutDown = true;
                if (!this.isRunning.compareAndSet(true, false)) {
                    log.warning("Monitor for %s already stopped", this.queue);
                }
                execute();
            }
        } catch (Throwable th) {
            if (!this.isRunning.compareAndSet(true, false)) {
                log.warning("Monitor for %s already stopped", this.queue);
            }
            execute();
            throw th;
        }
    }

    @Override // com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueMonitor
    public void shutDown() {
        log.info("Shut down monitor for %s", this.channel);
        this.isShutDown = true;
    }

    @Override // com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueMonitor
    public boolean isShutDown() {
        return this.isShutDown;
    }

    @Override // com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueMonitor
    public SQSQueue getQueue() {
        return this.queue;
    }

    @Override // com.ribose.jenkins.plugin.awscodecommittrigger.interfaces.SQSQueueMonitor
    public SQSChannel getChannel() {
        return this.channel;
    }

    private void execute() {
        if (this.isShutDown) {
            return;
        }
        this.executor.execute(this);
    }

    private void processMessages() {
        if (this.isShutDown) {
            return;
        }
        List<Message> messages = this.channel.getMessages();
        List<Message> notifyListeners = notifyListeners(messages);
        log.debug("Received %d messages, proceed %d messages", Integer.valueOf(messages.size()), Integer.valueOf(notifyListeners.size()));
        this.channel.deleteMessages(notifyListeners);
    }

    private List<Message> notifyListeners(List<Message> list) {
        ArrayList arrayList = new ArrayList();
        if (!list.isEmpty()) {
            Iterator<SQSQueueListener> it = getListeners().iterator();
            while (it.hasNext()) {
                List<Message> handleMessages = it.next().handleMessages(list);
                arrayList.addAll(handleMessages);
                list.removeAll(handleMessages);
            }
        }
        return arrayList;
    }

    private List<SQSQueueListener> getListeners() {
        ArrayList arrayList;
        synchronized (this.listenersLock) {
            arrayList = new ArrayList(this.listeners);
        }
        return arrayList;
    }

    static {
        $assertionsDisabled = !SQSQueueMonitorImpl.class.desiredAssertionStatus();
        log = Log.get(SQSQueueMonitorImpl.class);
    }
}
