package org.jenkinsci.plugins.ssegateway.sse;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import hudson.Extension;
import hudson.model.User;
import hudson.util.CopyOnWriteMap;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSessionEvent;
import jenkins.model.Jenkins;
import jenkins.util.HttpSessionListener;
import net.sf.json.JSONObject;
import org.acegisecurity.Authentication;
import org.apache.commons.io.IOUtils;
import org.jenkinsci.plugins.pubsub.ChannelSubscriber;
import org.jenkinsci.plugins.pubsub.EventFilter;
import org.jenkinsci.plugins.pubsub.EventProps;
import org.jenkinsci.plugins.pubsub.Message;
import org.jenkinsci.plugins.pubsub.MessageException;
import org.jenkinsci.plugins.pubsub.PubsubBus;
import org.jenkinsci.plugins.pubsub.SimpleMessage;
import org.jenkinsci.plugins.ssegateway.EventHistoryStore;
import org.jenkinsci.plugins.ssegateway.Util;
import org.jenkinsci.plugins.ssegateway.sse.SSEChannel;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Restricted({NoExternalUse.class})
/* loaded from: input_file:WEB-INF/lib/sse-gateway.jar:org/jenkinsci/plugins/ssegateway/sse/EventDispatcher.class */
public abstract class EventDispatcher implements Serializable {
    public static final String SESSION_SYNC_OBJ = "org.jenkinsci.plugins.ssegateway.sse.session.sync";
    private final transient Authentication authentication;
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class.getName());
    private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Integer.getInteger(EventDispatcher.class.getName() + ".scheduledExecutorService.size", 4).intValue(), runnable -> {
        return new Thread(runnable, "EventDispatcher.retryProcessor");
    });

    @SuppressFBWarnings({"MS_SHOULD_BE_FINAL"})
    public static long RETRY_QUEUE_EVENT_LIFETIME = Integer.getInteger(EventDispatcher.class.getName() + ".RETRY_QUEUE_EVENT_LIFETIME", 300).intValue() * 1000;

    @SuppressFBWarnings({"MS_SHOULD_BE_FINAL"})
    public static long RETRY_QUEUE_PROCESSING_DELAY = Integer.getInteger(EventDispatcher.class.getName() + ".RETRY_QUEUE_PROCESSING_DELAY", 250).intValue();

    @SuppressFBWarnings({"MS_SHOULD_BE_FINAL"})
    public static long TIMEOUT_DISPATCHERFAIL = Integer.getInteger(EventDispatcher.class.getName() + ".TIMEOUT_DISPATCHERFAIL", 14400).intValue() * 1000;
    private volatile boolean isRetryLoopActive = false;
    private String id = null;
    private transient Map<EventFilter, ChannelSubscriber> subscribers = new CopyOnWriteMap.Hash();
    private long timestamp_dispatchEventOK = System.currentTimeMillis();
    private transient Queue<Retry> retryQueue = new ConcurrentLinkedQueue();
    private final transient PubsubBus bus = PubsubBus.getBus();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/sse-gateway.jar:org/jenkinsci/plugins/ssegateway/sse/EventDispatcher$Retry.class */
    public static class Retry {
        private final long timestamp;
        private final String channelName;
        private final String eventUUID;

        private Retry(@Nonnull Message message) {
            this.timestamp = System.currentTimeMillis();
            this.channelName = message.getChannelName().intern();
            this.eventUUID = message.getEventUUID().intern();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean needsMoreTimeToLandInStore() {
            return System.currentTimeMillis() - this.timestamp < 10000;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/sse-gateway.jar:org/jenkinsci/plugins/ssegateway/sse/EventDispatcher$SSEChannelSubscriber.class */
    public final class SSEChannelSubscriber implements ChannelSubscriber {
        private int numSubscribers;

        private SSEChannelSubscriber() {
            this.numSubscribers = 0;
        }

        public void onMessage(@Nonnull Message message) {
            EventDispatcher.this.doDispatch(message);
        }

        static /* synthetic */ int access$108(SSEChannelSubscriber sSEChannelSubscriber) {
            int i = sSEChannelSubscriber.numSubscribers;
            sSEChannelSubscriber.numSubscribers = i + 1;
            return i;
        }

        static /* synthetic */ int access$110(SSEChannelSubscriber sSEChannelSubscriber) {
            int i = sSEChannelSubscriber.numSubscribers;
            sSEChannelSubscriber.numSubscribers = i - 1;
            return i;
        }
    }

    @Extension
    /* loaded from: input_file:WEB-INF/lib/sse-gateway.jar:org/jenkinsci/plugins/ssegateway/sse/EventDispatcher$SSEHttpSessionListener.class */
    public static final class SSEHttpSessionListener extends HttpSessionListener {
        /* JADX WARN: Finally extract failed */
        public void sessionDestroyed(HttpSessionEvent httpSessionEvent) {
            try {
                Map<String, EventDispatcher> dispatchers = EventDispatcherFactory.getDispatchers(httpSessionEvent.getSession());
                try {
                    for (EventDispatcher eventDispatcher : dispatchers.values()) {
                        try {
                            eventDispatcher.unsubscribeAll();
                        } catch (Exception e) {
                            if (EventDispatcher.LOGGER.isDebugEnabled()) {
                                EventDispatcher.LOGGER.debug("Error during unsubscribeAll() for dispatcher " + eventDispatcher.getId() + ".", e);
                            }
                        }
                    }
                    dispatchers.clear();
                } catch (Throwable th) {
                    dispatchers.clear();
                    throw th;
                }
            } catch (Exception e2) {
                EventDispatcher.LOGGER.debug("Error during session cleanup. The session has probably timed out." + this, e2);
            }
        }
    }

    public EventDispatcher() {
        if (getUser() != null) {
            this.authentication = Jenkins.getAuthentication();
        } else {
            this.authentication = Jenkins.ANONYMOUS;
        }
    }

    public abstract void start(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException;

    public abstract HttpServletResponse getResponse();

    public Map<EventFilter, ChannelSubscriber> getSubscribers() {
        return Collections.unmodifiableMap(this.subscribers);
    }

    public final String getId() {
        if (this.id == null) {
            throw new IllegalStateException("Call to getId before the ID was set.");
        }
        return this.id;
    }

    public void setId(String str) {
        this.id = str;
    }

    public String toString() {
        return String.format("%s (%s)", this.id, Integer.valueOf(System.identityHashCode(this)));
    }

    private void checkDispatcherFailTimeout(String str) {
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - this.timestamp_dispatchEventOK;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("SSE dispatcher %s %s fail - %d - %d - %d", this, str, Long.valueOf(currentTimeMillis), Long.valueOf(j), Long.valueOf(TIMEOUT_DISPATCHERFAIL)));
        }
        if (j > TIMEOUT_DISPATCHERFAIL) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("SSE dispatcher %s %s fail - timediff > TIMEOUT_DISPATCHERFAIL", this, str));
            }
            this.retryQueue.clear();
            unsubscribeAll();
        }
    }

    public synchronized boolean dispatchEvent(String str, String str2) throws IOException, ServletException {
        HttpServletResponse response = getResponse();
        if (response == null) {
            checkDispatcherFailTimeout("response");
            return false;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("SSE dispatcher %s sending event: %s", this, str2));
        }
        PrintWriter writer = response.getWriter();
        if (writer.checkError()) {
            checkDispatcherFailTimeout("writer.checkError");
            return false;
        }
        if (str != null) {
            writer.write("event: " + str + IOUtils.LINE_SEPARATOR_UNIX);
        }
        if (str2 != null) {
            writer.write("data: " + str2 + IOUtils.LINE_SEPARATOR_UNIX);
        }
        writer.write(IOUtils.LINE_SEPARATOR_UNIX);
        boolean checkError = writer.checkError();
        if (checkError) {
            checkDispatcherFailTimeout("writer.write");
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("SSE dispatcher %s writer ok - %d", this, Long.valueOf(System.currentTimeMillis())));
            }
            this.timestamp_dispatchEventOK = System.currentTimeMillis();
        }
        return !checkError;
    }

    public void stop() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setDefaultHeaders() {
        HttpServletResponse response = getResponse();
        response.setStatus(200);
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Cache-Control", "no-cache");
        response.setHeader("Connection", "keep-alive");
    }

    public boolean subscribe(@Nonnull EventFilter eventFilter) {
        String channelName = eventFilter.getChannelName();
        if (channelName == null) {
            LOGGER.error(String.format("Invalid SSE subscribe configuration. '%s' not specified.", EventProps.Jenkins.jenkins_channel));
            return false;
        }
        SSEChannelSubscriber sSEChannelSubscriber = (SSEChannelSubscriber) this.subscribers.get(eventFilter);
        if (sSEChannelSubscriber == null) {
            sSEChannelSubscriber = new SSEChannelSubscriber();
            this.bus.subscribe(channelName, sSEChannelSubscriber, this.authentication, eventFilter);
            this.subscribers.put(eventFilter, sSEChannelSubscriber);
        }
        SSEChannelSubscriber.access$108(sSEChannelSubscriber);
        publishStateEvent(SSEChannel.Event.subscribe, new SimpleMessage().set(SSEChannel.EventProps.sse_subs_dispatcher, this.id).set(SSEChannel.EventProps.sse_subs_channel_name, channelName).set(SSEChannel.EventProps.sse_subs_filter, eventFilter.toJSON()));
        return true;
    }

    protected User getUser() {
        return User.current();
    }

    public boolean unsubscribe(@Nonnull EventFilter eventFilter) {
        String channelName = eventFilter.getChannelName();
        if (channelName == null) {
            LOGGER.error(String.format("Invalid SSE unsubscribe configuration. '%s' not specified.", EventProps.Jenkins.jenkins_channel));
            return false;
        }
        SSEChannelSubscriber sSEChannelSubscriber = (SSEChannelSubscriber) this.subscribers.get(eventFilter);
        if (sSEChannelSubscriber == null) {
            LOGGER.info("Invalid SSE unsubscribe configuration. No active subscription for channel: {}", channelName);
            return false;
        }
        SSEChannelSubscriber.access$110(sSEChannelSubscriber);
        if (sSEChannelSubscriber.numSubscribers == 0) {
            try {
                this.bus.unsubscribe(channelName, sSEChannelSubscriber);
                this.subscribers.remove(eventFilter);
            } catch (Throwable th) {
                this.subscribers.remove(eventFilter);
                throw th;
            }
        }
        publishStateEvent(SSEChannel.Event.unsubscribe, new SimpleMessage().set(SSEChannel.EventProps.sse_subs_dispatcher, this.id).set(SSEChannel.EventProps.sse_subs_channel_name, channelName).set(SSEChannel.EventProps.sse_subs_filter, eventFilter.toJSON()));
        return true;
    }

    public void unsubscribeAll() {
        for (Map.Entry<EventFilter, ChannelSubscriber> entry : this.subscribers.entrySet()) {
            SSEChannelSubscriber sSEChannelSubscriber = (SSEChannelSubscriber) entry.getValue();
            this.bus.unsubscribe(entry.getKey().getChannelName(), sSEChannelSubscriber);
        }
        this.subscribers.clear();
    }

    private void scheduleRetryQueueProcessing(long j) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("EventDispatcher (%s) - scheduleRetryQueueProcessing(%d)", this, Long.valueOf(j)));
        }
        if (j <= 0) {
            processRetries();
            return;
        }
        try {
            scheduledExecutorService.schedule(this::processRetries, j, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOGGER.info(String.format("EventDispatcher (%s) - scheduleRetryQueueProcessing - Error scheduling retry.", this), e);
        }
    }

    private void publishStateEvent(SSEChannel.Event event, Message message) {
        if (Util.isTestEnv()) {
            try {
                SimpleMessage simpleMessage = new SimpleMessage().setChannelName("sse").setEventName(event).set("sse_numsubs", Integer.toString(this.subscribers.size()));
                if (message != null) {
                    simpleMessage.putAll(message);
                }
                this.bus.publish(simpleMessage);
            } catch (MessageException e) {
                LOGGER.warn("Failed to publish SSE Dispatcher state event.", e);
            }
        }
    }

    private void dispatchReload() {
        this.retryQueue.clear();
        try {
            dispatchEvent("reload", null);
        } catch (Exception e) {
            LOGGER.error("Unable to send reload event to client.", e);
        }
    }

    private void addToRetryQueue(@Nonnull Message message) {
        boolean isEmpty = this.retryQueue.isEmpty();
        if (!this.retryQueue.add(new Retry(message))) {
            dispatchReload();
        } else if (isEmpty) {
            scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void processRetries() {
        if (this.isRetryLoopActive) {
            return;
        }
        this.isRetryLoopActive = true;
        Retry peek = this.retryQueue.peek();
        if (peek != null) {
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis - peek.timestamp;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("EventDispatcher (%s) - timestamp: %d - current: %d - age: %d", this, Long.valueOf(peek.timestamp), Long.valueOf(currentTimeMillis), Long.valueOf(j)));
            }
            if (j > RETRY_QUEUE_EVENT_LIFETIME) {
                LOGGER.debug("EventDispatcher {} processRetries - clear retryQueue", this);
                this.retryQueue.clear();
                peek = null;
            }
        }
        while (peek != null) {
            try {
                try {
                    try {
                        String channelEvent = EventHistoryStore.getChannelEvent(peek.channelName, peek.eventUUID);
                        if (channelEvent == null) {
                            if (peek.needsMoreTimeToLandInStore()) {
                                if (!this.retryQueue.isEmpty()) {
                                    scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
                                }
                                this.isRetryLoopActive = false;
                                return;
                            } else {
                                dispatchReload();
                                if (!this.retryQueue.isEmpty()) {
                                    scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
                                }
                                this.isRetryLoopActive = false;
                                return;
                            }
                        }
                        if (Util.isTestEnv()) {
                            JSONObject fromObject = JSONObject.fromObject(channelEvent);
                            fromObject.put(SSEChannel.EventProps.sse_dispatch_retry.name(), "true");
                            channelEvent = fromObject.toString();
                        }
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(String.format("EventDispatcher (%s) - retry event: %s", this, channelEvent));
                        }
                        if (!dispatchEvent(peek.channelName, channelEvent)) {
                            LOGGER.debug(String.format("EventDispatcher (%s) - Error dispatching retry event to SSE channel. dispatchEvent failed.", this));
                            if (!this.retryQueue.isEmpty()) {
                                scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
                            }
                            this.isRetryLoopActive = false;
                            return;
                        }
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("EventDispatcher ({0}) - Dispatched retry event to SSE channel. Event {1}.", new Object[]{this, channelEvent});
                        }
                        this.retryQueue.remove();
                        peek = this.retryQueue.peek();
                    } catch (Exception e) {
                        LOGGER.warn(String.format("EventDispatcher (%s) - Error dispatching retry event to SSE channel. Write failed.", this), e);
                        if (!this.retryQueue.isEmpty()) {
                            scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
                        }
                        this.isRetryLoopActive = false;
                        return;
                    }
                } catch (Throwable th) {
                    if (!this.retryQueue.isEmpty()) {
                        scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
                    }
                    this.isRetryLoopActive = false;
                    throw th;
                }
            } catch (Exception e2) {
                LOGGER.debug(String.format("EventDispatcher (%s) - Error dispatching retry event to SSE channel. Write failed.", this), e2);
                if (!this.retryQueue.isEmpty()) {
                    scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
                }
                this.isRetryLoopActive = false;
                return;
            }
        }
        if (!this.retryQueue.isEmpty()) {
            scheduleRetryQueueProcessing(RETRY_QUEUE_PROCESSING_DELAY);
        }
        this.isRetryLoopActive = false;
        this.isRetryLoopActive = false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doDispatch(@Nonnull Message message) {
        if (!this.retryQueue.isEmpty()) {
            addToRetryQueue(message);
            return;
        }
        try {
            message.set(SSEChannel.EventProps.sse_subs_dispatcher, this.id);
            message.set(SSEChannel.EventProps.sse_subs_dispatcher_inst, Integer.toString(System.identityHashCode(this)));
            if (!dispatchEvent(message.getChannelName(), message.toJSON())) {
                LOGGER.debug("Error dispatching event to SSE channel. dispatchEvent failed.");
                addToRetryQueue(message);
            }
        } catch (Exception e) {
            LOGGER.debug("Error dispatching event to SSE channel.", e);
            addToRetryQueue(message);
        }
    }
}
