package com.redhat.jenkins.plugins.ci.messaging;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.jayway.jsonpath.Predicate;
import com.redhat.jenkins.plugins.ci.CIEnvironmentContributingAction;
import com.redhat.jenkins.plugins.ci.messaging.checks.MsgCheck;
import com.redhat.jenkins.plugins.ci.messaging.data.FedmsgMessage;
import com.redhat.utils.MessageUtils;
import com.redhat.utils.OrderedProperties;
import com.redhat.utils.PluginUtils;
import hudson.EnvVars;
import hudson.model.Result;
import hudson.model.Run;
import hudson.model.TaskListener;
import java.io.IOException;
import java.io.StringReader;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import net.sf.json.JSONObject;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.jms.selector.ZmqMessageSelector;
import org.zeromq.jms.selector.ZmqSimpleMessageSelector;

/* loaded from: input_file:WEB-INF/lib/jms-messaging.jar:com/redhat/jenkins/plugins/ci/messaging/FedMsgMessagingWorker.class */
public class FedMsgMessagingWorker extends JMSMessagingWorker {
    private static final Logger log = Logger.getLogger(FedMsgMessagingWorker.class.getName());
    private final FedMsgMessagingProvider provider;
    private final MessagingProviderOverrides overrides;
    public static final String DEFAULT_PREFIX = "org.fedoraproject";
    private ZMQ.Context context;
    private ZMQ.Poller poller;
    private ZMQ.Socket socket;
    private String topic;
    private String selector;
    private boolean interrupt = false;
    private boolean pollerClosed = false;

    public FedMsgMessagingWorker(FedMsgMessagingProvider fedMsgMessagingProvider, MessagingProviderOverrides messagingProviderOverrides, String str) {
        this.provider = fedMsgMessagingProvider;
        this.overrides = messagingProviderOverrides;
        this.jobname = str;
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public boolean subscribe(String str, String str2) {
        if (this.interrupt) {
            return true;
        }
        this.topic = getTopic();
        this.selector = str2;
        if (this.topic == null) {
            return false;
        }
        while (!Thread.currentThread().isInterrupted()) {
            try {
                if (!isConnected() && !connect()) {
                    return false;
                }
                if (this.socket != null) {
                    log.info("Already subscribed to " + this.topic + " topic with selector: " + str2 + " for job '" + str);
                    return true;
                }
                this.socket = this.context.socket(2);
                log.info("Subscribing job '" + str + "' to " + this.topic + " topic.");
                this.socket.subscribe(this.topic.getBytes());
                this.socket.setLinger(0L);
                this.socket.connect(this.provider.getHubAddr());
                this.poller.register(this.socket, 1);
                log.info("Successfully subscribed job '" + str + "' to " + this.topic + " topic with selector: " + str2);
                return true;
            } catch (Exception e) {
                log.log(Level.SEVERE, "Eexception raised while subscribing job '" + str + "', retrying in " + RETRY_MINUTES + " minutes.", (Throwable) e);
                if (!Thread.currentThread().isInterrupted()) {
                    unsubscribe(str);
                    try {
                        Thread.sleep(RETRY_MINUTES.intValue() * 60 * 1000);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        return false;
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public void unsubscribe(String str) {
        if (this.interrupt) {
            log.info("We are being interrupted. Skipping unsubscribe...");
            return;
        }
        try {
            if (this.poller != null) {
                for (Integer num = 0; num.intValue() < this.poller.getSize(); num = Integer.valueOf(num.intValue() + 1)) {
                    ZMQ.Socket socket = this.poller.getSocket(num.intValue());
                    this.poller.unregister(socket);
                    socket.disconnect(this.provider.getHubAddr());
                    log.info("Un-subscribing job '" + str + "' from " + this.topic + " topic.");
                    this.socket.unsubscribe(this.topic.getBytes());
                }
                this.socket.close();
            }
            if (this.context != null) {
                this.context.term();
            }
        } catch (Exception e) {
            log.warning(e.getMessage());
        }
        this.poller = null;
        this.context = null;
        this.socket = null;
        this.pollerClosed = true;
    }

    private String formatMessage(FedmsgMessage fedmsgMessage) {
        return fedmsgMessage.getMsg().toString();
    }

    private void process(FedmsgMessage fedmsgMessage) {
        HashMap hashMap = new HashMap();
        hashMap.put("CI_MESSAGE", getMessageBody(fedmsgMessage));
        hashMap.put("MESSAGE_HEADERS", getMessageHeaders(fedmsgMessage));
        for (String str : fedmsgMessage.getMsg().keySet()) {
            Object obj = fedmsgMessage.getMsg().get(str);
            if (obj instanceof String) {
                hashMap.put(str, (String) obj);
            }
            if (obj instanceof Integer) {
                hashMap.put(str, ((Integer) obj).toString());
            }
        }
        trigger(this.jobname, formatMessage(fedmsgMessage), hashMap);
    }

    private String getMessageBody(FedmsgMessage fedmsgMessage) {
        return JSONObject.fromObject(fedmsgMessage.getMsg()).toString();
    }

    public String getMessageHeaders(FedmsgMessage fedmsgMessage) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            ObjectNode createObjectNode = objectMapper.createObjectNode();
            createObjectNode.set("topic", (JsonNode) objectMapper.convertValue(fedmsgMessage.getTopic(), JsonNode.class));
            return objectMapper.writer().writeValueAsString(createObjectNode);
        } catch (Exception e) {
            log.log(Level.SEVERE, "Unhandled exception retrieving message headers:\n" + getMessageBody(fedmsgMessage), (Throwable) e);
            return "";
        }
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public void receive(String str, String str2, List<MsgCheck> list, long j) {
        if (this.interrupt) {
            log.info("we have been interrupted at start of receive");
            return;
        }
        while (!subscribe(str, str2)) {
            if (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(2 * 1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        ObjectMapper objectMapper = new ObjectMapper();
        long time = new Date().getTime();
        while (true) {
            try {
                if (new Date().getTime() - time >= j) {
                    break;
                }
                if (this.poller.poll(1000L) > 0) {
                    this.pollerClosed = false;
                    if (this.poller.pollin(0)) {
                        ZMsg recvMsg = ZMsg.recvMsg(this.poller.getSocket(0));
                        time = new Date().getTime();
                        FedmsgMessage fedmsgMessage = (FedmsgMessage) objectMapper.readValue(recvMsg.getLast().toString(), FedmsgMessage.class);
                        fedmsgMessage.getMsg().put("topic", fedmsgMessage.getTopic());
                        ZmqMessageSelector parse = ZmqSimpleMessageSelector.parse(str2);
                        log.fine("Evaluating selector: " + parse.toString());
                        if (parse.evaluate(fedmsgMessage.getMsg())) {
                            boolean z = true;
                            Iterator<MsgCheck> it = list.iterator();
                            while (true) {
                                if (!it.hasNext()) {
                                    break;
                                }
                                MsgCheck next = it.next();
                                if (!verify(fedmsgMessage, next)) {
                                    z = false;
                                    log.fine("msg check: " + next.toString() + " failed against: " + formatMessage(fedmsgMessage));
                                    break;
                                }
                            }
                            if (z) {
                                if (list.size() > 0) {
                                    log.fine("All msg checks have passed.");
                                }
                                process(fedmsgMessage);
                            } else {
                                log.fine("Some msg checks did not pass.");
                            }
                        } else {
                            log.fine("false");
                        }
                    }
                } else if (this.interrupt) {
                    log.info("We have been interrupted...");
                    this.pollerClosed = true;
                    break;
                }
            } catch (Exception e2) {
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
                log.log(Level.WARNING, "JMS exception raised, going to re-subscribe for job '" + str + "'.", (Throwable) e2);
                log.log(Level.SEVERE, ExceptionUtils.getStackTrace(e2));
                unsubscribe(str);
                return;
            }
        }
        if (!this.interrupt) {
            log.info("No message received for the past " + j + " ms, re-subscribing for job '" + str + "'.");
            unsubscribe(str);
        }
    }

    private boolean verify(FedmsgMessage fedmsgMessage, MsgCheck msgCheck) {
        Map<String, Object> msg = fedmsgMessage.getMsg();
        if (msg == null) {
            return false;
        }
        String str = "";
        String field = msgCheck.getField();
        if (field.startsWith("$")) {
            log.info("field " + field + " contains $, therefore using jsonPath");
            try {
                str = (String) JsonPath.parse(fedmsgMessage.getMsgJson()).read(field, new Predicate[0]);
            } catch (PathNotFoundException e) {
                log.fine(e.getMessage());
                return false;
            }
        } else {
            Object obj = msg.get(msgCheck.getField());
            if (obj != null) {
                str = obj.toString();
            }
        }
        return Pattern.compile(msgCheck.getExpectedValue() != null ? msgCheck.getExpectedValue() : "").matcher(str).find();
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public boolean connect() {
        this.context = ZMQ.context(1);
        this.poller = this.context.poller(1);
        return true;
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public boolean isConnected() {
        return this.poller != null;
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public boolean isConnectedAndSubscribed() {
        return isConnected();
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public void disconnect() {
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public boolean sendMessage(Run<?, ?> run, TaskListener taskListener, MessageUtils.MESSAGE_TYPE message_type, String str, String str2, boolean z) {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket socket = context.socket(1);
        socket.setLinger(0L);
        log.fine("pub address: " + this.provider.getPubAddr());
        socket.connect(this.provider.getPubAddr());
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TreeMap treeMap = new TreeMap();
        HashMap hashMap = new HashMap();
        hashMap.put("CI_NAME", run.getParent().getName());
        treeMap.put("CI_NAME", run.getParent().getName());
        hashMap.put("CI_TYPE", message_type.getMessage());
        treeMap.put("CI_TYPE", message_type.getMessage());
        if (!run.isBuilding()) {
            String str3 = run.getResult() == Result.SUCCESS ? "passed" : "failed";
            hashMap.put("CI_STATUS", str3);
            treeMap.put("CI_STATUS", str3);
            treeMap.put("BUILD_STATUS", run.getResult().toString());
        }
        try {
            try {
                EnvVars environment = run.getEnvironment(taskListener);
                EnvVars envVars = new EnvVars();
                envVars.putAll(environment);
                envVars.putAll(treeMap);
                if (str != null && !str.trim().equals("")) {
                    OrderedProperties orderedProperties = new OrderedProperties();
                    orderedProperties.load(new StringReader(str));
                    Enumeration propertyNames = orderedProperties.propertyNames();
                    while (propertyNames.hasMoreElements()) {
                        String str4 = (String) propertyNames.nextElement();
                        EnvVars envVars2 = new EnvVars();
                        envVars2.putAll(environment);
                        envVars2.putAll(treeMap);
                        String substitutedValue = PluginUtils.getSubstitutedValue(orderedProperties.getProperty(str4), envVars2);
                        hashMap.put(str4, substitutedValue);
                        treeMap.put(str4, substitutedValue);
                    }
                }
                EnvVars envVars3 = new EnvVars();
                envVars3.putAll(environment);
                envVars3.putAll(treeMap);
                hashMap.put(JMSMessagingWorker.MESSAGECONTENTFIELD, PluginUtils.getSubstitutedValue(str2, envVars3));
                FedmsgMessage fedmsgMessage = new FedmsgMessage();
                fedmsgMessage.setMsg(hashMap);
                fedmsgMessage.setTopic(PluginUtils.getSubstitutedValue(getTopic(), run.getEnvironment(taskListener)));
                fedmsgMessage.setTimestamp(new Date().getTime() / 1000);
                boolean sendMore = socket.sendMore(fedmsgMessage.getTopic());
                if (z && !sendMore) {
                    log.severe("Unhandled exception in perform: Failed to send message (topic)!");
                    socket.close();
                    context.term();
                    return false;
                }
                boolean send = socket.send(fedmsgMessage.toJson().toString());
                if (z && !send) {
                    log.severe("Unhandled exception in perform: Failed to send message (body)!");
                    socket.close();
                    context.term();
                    return false;
                }
                log.fine(fedmsgMessage.toJson().toString());
                taskListener.getLogger().println(fedmsgMessage.toJson().toString());
                socket.close();
                context.term();
                return true;
            } catch (Exception e2) {
                if (z) {
                    log.severe("Unhandled exception in perform: ");
                    log.severe(ExceptionUtils.getStackTrace(e2));
                    taskListener.fatalError("Unhandled exception in perform: ");
                    taskListener.fatalError(ExceptionUtils.getStackTrace(e2));
                    socket.close();
                    context.term();
                    return false;
                }
                log.warning("Unhandled exception in perform: ");
                log.warning(ExceptionUtils.getStackTrace(e2));
                taskListener.error("Unhandled exception in perform: ");
                taskListener.error(ExceptionUtils.getStackTrace(e2));
                socket.close();
                context.term();
                return true;
            }
        } catch (Throwable th) {
            socket.close();
            context.term();
            throw th;
        }
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public String waitForMessage(Run<?, ?> run, TaskListener taskListener, String str, String str2, List<MsgCheck> list, Integer num) {
        log.info("Waiting for message with selector: " + str);
        Iterator<MsgCheck> it = list.iterator();
        while (it.hasNext()) {
            log.info(" with check: " + it.next().toString());
        }
        taskListener.getLogger().println("Waiting for message with selector: " + str);
        Iterator<MsgCheck> it2 = list.iterator();
        while (it2.hasNext()) {
            taskListener.getLogger().println(" with check: " + it2.next().toString());
        }
        log.info(" with timeout: " + num);
        taskListener.getLogger().println(" with timeout: " + num);
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Poller poller = context.poller(1);
        ZMQ.Socket socket = context.socket(2);
        String topic = getTopic();
        try {
            topic = PluginUtils.getSubstitutedValue(getTopic(), run.getEnvironment(taskListener));
        } catch (IOException e) {
            log.warning(e.getMessage());
        } catch (InterruptedException e2) {
            log.warning(e2.getMessage());
        }
        socket.subscribe(topic.getBytes());
        socket.setLinger(0L);
        socket.connect(this.provider.getHubAddr());
        poller.register(socket, 1);
        ObjectMapper objectMapper = new ObjectMapper();
        long time = new Date().getTime();
        int intValue = num.intValue() * 60 * 1000;
        while (new Date().getTime() - time < intValue) {
            try {
                try {
                    if (poller.poll(1000L) > 0 && poller.pollin(0)) {
                        FedmsgMessage fedmsgMessage = (FedmsgMessage) objectMapper.readValue(ZMsg.recvMsg(poller.getSocket(0)).getLast().toString(), FedmsgMessage.class);
                        fedmsgMessage.getMsg().put("topic", fedmsgMessage.getTopic());
                        ZmqMessageSelector parse = ZmqSimpleMessageSelector.parse(str);
                        log.fine("Evaluating selector: " + parse.toString());
                        if (parse.evaluate(fedmsgMessage.getMsg())) {
                            boolean z = true;
                            for (MsgCheck msgCheck : list) {
                                if (!verify(fedmsgMessage, msgCheck)) {
                                    z = false;
                                    log.fine("msg check: " + msgCheck.toString() + " failed against: " + formatMessage(fedmsgMessage));
                                }
                            }
                            if (z) {
                                if (list.size() > 0) {
                                    log.fine("All msg checks have passed.");
                                }
                                process(fedmsgMessage);
                                String messageBody = getMessageBody(fedmsgMessage);
                                if (run != null && StringUtils.isNotEmpty(str2)) {
                                    EnvVars envVars = new EnvVars();
                                    envVars.put(str2, messageBody);
                                    run.addAction(new CIEnvironmentContributingAction(envVars));
                                }
                                ZMQ.Socket socket2 = poller.getSocket(0);
                                poller.unregister(socket2);
                                socket2.disconnect(this.provider.getHubAddr());
                                socket.unsubscribe(topic.getBytes());
                                socket.close();
                                context.term();
                                return messageBody;
                            }
                            log.fine("Some msg checks did not pass.");
                        } else {
                            log.fine("false");
                        }
                    }
                } catch (Exception e3) {
                    log.log(Level.SEVERE, "Unhandled exception waiting for message.", (Throwable) e3);
                    ZMQ.Socket socket3 = poller.getSocket(0);
                    poller.unregister(socket3);
                    socket3.disconnect(this.provider.getHubAddr());
                    socket.unsubscribe(topic.getBytes());
                    socket.close();
                    context.term();
                    return null;
                }
            } catch (Throwable th) {
                ZMQ.Socket socket4 = poller.getSocket(0);
                poller.unregister(socket4);
                socket4.disconnect(this.provider.getHubAddr());
                socket.unsubscribe(topic.getBytes());
                socket.close();
                context.term();
                throw th;
            }
        }
        if (0 != 0) {
            ZMQ.Socket socket5 = poller.getSocket(0);
            poller.unregister(socket5);
            socket5.disconnect(this.provider.getHubAddr());
            socket.unsubscribe(topic.getBytes());
            socket.close();
            context.term();
            return null;
        }
        log.severe("Timed out waiting for message!");
        taskListener.getLogger().println("Timed out waiting for message!");
        ZMQ.Socket socket6 = poller.getSocket(0);
        poller.unregister(socket6);
        socket6.disconnect(this.provider.getHubAddr());
        socket.unsubscribe(topic.getBytes());
        socket.close();
        context.term();
        return null;
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public void prepareForInterrupt() {
        this.interrupt = true;
        while (true) {
            try {
                if (this.pollerClosed) {
                    break;
                }
                if (!Thread.currentThread().isAlive()) {
                    log.info("poller not closed yet BUT trigger thread is dead. continuing interrupt");
                    break;
                } else {
                    try {
                        log.info("poller not closed yet. Sleeping for 1 sec...");
                        Thread.currentThread();
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Exception e2) {
                log.fine(e2.getMessage());
            }
        }
        if (this.poller != null) {
            ZMQ.Socket socket = this.poller.getSocket(0);
            this.poller.unregister(socket);
            socket.disconnect(this.provider.getHubAddr());
            log.info("Un-subscribing job '" + this.jobname + "' from " + this.topic + " topic.");
            this.socket.unsubscribe(this.topic.getBytes());
            this.socket.close();
        }
        if (this.context != null) {
            this.context.term();
        }
        this.poller = null;
        this.socket = null;
        this.interrupt = false;
    }

    @Override // com.redhat.jenkins.plugins.ci.messaging.JMSMessagingWorker
    public boolean isBeingInterrupted() {
        return this.interrupt;
    }

    private String getTopic() {
        return (this.overrides == null || this.overrides.getTopic() == null || this.overrides.getTopic().isEmpty()) ? (this.provider.getTopic() == null || this.provider.getTopic().isEmpty()) ? DEFAULT_PREFIX : this.provider.getTopic() : this.overrides.getTopic();
    }
}
