package org.owasp.appsensor.kafka;

import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Named;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.commons.lang3.StringUtils;
import org.owasp.appsensor.core.AppSensorServer;
import org.owasp.appsensor.core.Attack;
import org.owasp.appsensor.core.Event;
import org.owasp.appsensor.core.KeyValuePair;
import org.owasp.appsensor.core.RequestHandler;
import org.owasp.appsensor.core.Response;
import org.owasp.appsensor.core.exceptions.NotAuthorizedException;
import org.owasp.appsensor.core.listener.ResponseListener;
import org.owasp.appsensor.core.storage.ResponseStoreListener;
import org.owasp.appsensor.kafka.KafkaConfig;
import org.owasp.appsensor.kafka.util.KafkaSender;
import org.owasp.appsensor.kafka.util.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;

@ResponseStoreListener
@Named
/* loaded from: input_file:org/owasp/appsensor/kafka/KafkaRequestHandler.class */
public class KafkaRequestHandler implements RequestHandler, KafkaConstants, ResponseListener {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private boolean initializedProperly = true;
    private final Gson gson = new Gson();

    @Inject
    private AppSensorServer appSensorServer;

    @Inject
    private Environment environment;
    private KafkaConfig config;
    private KafkaSender kafkaSender;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/owasp/appsensor/kafka/KafkaRequestHandler$ListenerThread.class */
    public class ListenerThread extends Thread {
        final String topic;
        ConsumerConnector consumerConnector;

        public ListenerThread(String str) {
            this.topic = str;
            KafkaRequestHandler.this.logger.info("Starting EventManager kafka consumer .. ");
            KafkaRequestHandler.this.logger.info("Connecting to zookeeper: " + KafkaRequestHandler.this.config.getConsumerZookeeperConnect());
            KafkaRequestHandler.this.logger.info("Connecting with group id: " + KafkaRequestHandler.this.config.getConsumerGroupId());
            KafkaRequestHandler.this.logger.info("Connecting with client id: " + KafkaRequestHandler.this.config.getClientApplicationName());
            KafkaRequestHandler.this.logger.info("Connecting to topic: " + this.topic);
            Properties properties = new Properties();
            properties.put("zookeeper.connect", KafkaRequestHandler.this.config.getConsumerZookeeperConnect());
            properties.put("group.id", KafkaRequestHandler.this.config.getConsumerGroupId());
            properties.put("client.id", KafkaRequestHandler.this.config.getClientApplicationName());
            properties.put("zookeeper.session.timeout.ms", "400");
            properties.put("zookeeper.sync.time.ms", "200");
            properties.put("auto.commit.interval.ms", "1000");
            this.consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
            KafkaRequestHandler.this.logger.info("Created event manager kafka consumer");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            HashMap hashMap = new HashMap();
            hashMap.put(this.topic, new Integer(1));
            ConsumerIterator it = ((KafkaStream) ((List) this.consumerConnector.createMessageStreams(hashMap).get(this.topic)).get(0)).iterator();
            while (it.hasNext()) {
                KeyValuePair keyValuePair = (KeyValuePair) KafkaRequestHandler.this.gson.fromJson(new String((byte[]) it.next().message()), KeyValuePair.class);
                String key = keyValuePair.getKey();
                String value = keyValuePair.getValue();
                if ("EVENT".equals(key)) {
                    KafkaRequestHandler.this.logger.trace("Received event on queue: " + this.topic);
                    KafkaRequestHandler.this.addEvent((Event) KafkaRequestHandler.this.gson.fromJson(value, Event.class));
                } else if ("ATTACK".equals(key)) {
                    KafkaRequestHandler.this.logger.trace("Received attack on queue: " + this.topic);
                    KafkaRequestHandler.this.addAttack((Attack) KafkaRequestHandler.this.gson.fromJson(value, Attack.class));
                } else {
                    KafkaRequestHandler.this.logger.trace("Received message for UNKNOWN topic: " + this.topic);
                }
            }
        }
    }

    public void addEvent(Event event) throws NotAuthorizedException {
        ensureInitialized();
        this.appSensorServer.getEventStore().addEvent(event);
    }

    public void addAttack(Attack attack) throws NotAuthorizedException {
        ensureInitialized();
        this.appSensorServer.getAttackStore().addAttack(attack);
    }

    public Collection<Response> getResponses(String str) throws NotAuthorizedException {
        throw new UnsupportedOperationException("Not used in the kafka implementation. Client applications receive responses from the client-specific topic in kafka.");
    }

    public void onAdd(Response response) {
        ensureInitialized();
        try {
            this.kafkaSender.send(buildTopicNames(response), this.gson.toJson(new KeyValuePair("RESPONSE", this.gson.toJson(response))));
        } catch (InterruptedException | ExecutionException e) {
            this.logger.error("Failed to send add response message to output topic.", e);
        }
    }

    public Environment getEnvironment() {
        return this.environment;
    }

    @PostConstruct
    public void ensureEnvironmentVariablesSet() {
        this.initializedProperly = isInitializedProperly();
        if (!this.initializedProperly) {
            this.logger.error(getUninitializedMessage());
            return;
        }
        initializeConfig();
        startKafkaListeners();
        initializeSender();
    }

    private void ensureInitialized() {
        if (!this.initializedProperly) {
            throw new IllegalStateException(getUninitializedMessage());
        }
    }

    private void initializeConfig() {
        Integer num = null;
        if (StringUtils.isNotBlank(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_PARTITION"))) {
            num = Integer.valueOf(Integer.parseInt(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_PARTITION")));
        }
        this.config = new KafkaConfig.KafkaConfigBuilder().setClientApplicationName(this.environment.getProperty("APPSENSOR_CLIENT_APPLICATION_NAME")).setConsumerGroupId(this.environment.getProperty("APPSENSOR_KAFKA_CONSUMER_GROUP_ID")).setConsumerZookeeperConnect(this.environment.getProperty("APPSENSOR_KAFKA_CONSUMER_ZOOKEEPER_CONNECT")).setProducerBootstrapServers(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_BOOTSTRAP_SERVERS")).setProducerPartition(num).setProducerKey(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_KEY")).build();
    }

    private void initializeSender() {
        this.kafkaSender = new KafkaSender(this.config);
    }

    private void startKafkaListeners() {
        this.logger.info("Starting kafka listeners for event/attack topics");
        new ListenerThread("appsensor-add-event-topic").start();
        new ListenerThread("appsensor-add-attack-topic").start();
        this.logger.info("Completed startup of kafka listeners for event/attack topics");
    }

    private boolean isInitializedProperly() {
        boolean z = StringUtils.isNotBlank(this.environment.getProperty("APPSENSOR_CLIENT_APPLICATION_NAME")) && StringUtils.isNotBlank(this.environment.getProperty("APPSENSOR_KAFKA_CONSUMER_GROUP_ID")) && StringUtils.isNotBlank(this.environment.getProperty("APPSENSOR_KAFKA_CONSUMER_ZOOKEEPER_CONNECT")) && StringUtils.isNotBlank(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_BOOTSTRAP_SERVERS"));
        if (!z) {
            return z;
        }
        if (StringUtils.isNotBlank(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_PARTITION"))) {
            try {
                Integer.parseInt(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_PARTITION"));
            } catch (NumberFormatException e) {
                z = false;
            }
            if (!z) {
                return z;
            }
            z = StringUtils.isNotBlank(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_KEY"));
            if (!z) {
                return z;
            }
        }
        return z;
    }

    private String getUninitializedMessage() {
        StringBuilder sb = new StringBuilder();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        if (StringUtils.isBlank(this.environment.getProperty("APPSENSOR_CLIENT_APPLICATION_NAME"))) {
            arrayList2.add("APPSENSOR_CLIENT_APPLICATION_NAME");
        } else {
            arrayList.add("APPSENSOR_CLIENT_APPLICATION_NAME");
        }
        if (StringUtils.isBlank(this.environment.getProperty("APPSENSOR_KAFKA_CONSUMER_GROUP_ID"))) {
            arrayList2.add("APPSENSOR_KAFKA_CONSUMER_GROUP_ID");
        } else {
            arrayList.add("APPSENSOR_KAFKA_CONSUMER_GROUP_ID");
        }
        if (StringUtils.isBlank(this.environment.getProperty("APPSENSOR_KAFKA_CONSUMER_ZOOKEEPER_CONNECT"))) {
            arrayList2.add("APPSENSOR_KAFKA_CONSUMER_ZOOKEEPER_CONNECT");
        } else {
            arrayList.add("APPSENSOR_KAFKA_CONSUMER_ZOOKEEPER_CONNECT");
        }
        if (StringUtils.isBlank(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_BOOTSTRAP_SERVERS"))) {
            arrayList2.add("APPSENSOR_KAFKA_PRODUCER_BOOTSTRAP_SERVERS");
        } else {
            arrayList.add("APPSENSOR_KAFKA_PRODUCER_BOOTSTRAP_SERVERS");
        }
        if (arrayList2.size() > 0) {
            sb.append("The following Environment variables must be set: ").append(arrayList2);
            if (arrayList.size() > 0) {
                sb.append(" (already set variables - ").append(arrayList).append(")");
            }
        }
        if (StringUtils.isNotBlank(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_PARTITION"))) {
            try {
                Integer.parseInt(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_PARTITION"));
            } catch (NumberFormatException e) {
                sb.append("\r\n");
                sb.append("If you use the APPSENSOR_KAFKA_PRODUCER_PARTITION, it must be set to an integer.");
            }
            if (StringUtils.isBlank(this.environment.getProperty("APPSENSOR_KAFKA_PRODUCER_KEY"))) {
                sb.append("\r\n");
                sb.append("If you use the APPSENSOR_KAFKA_PRODUCER_PARTITION, you must also set the APPSENSOR_KAFKA_PRODUCER_KEY.");
            }
        }
        return sb.toString();
    }

    private Collection<String> buildTopicNames(Response response) {
        HashSet hashSet = new HashSet();
        Iterator it = this.appSensorServer.getConfiguration().getRelatedDetectionSystems(response.getDetectionSystem()).iterator();
        while (it.hasNext()) {
            hashSet.add(KafkaUtils.buildResponseTopicName((String) it.next()));
        }
        return hashSet;
    }
}
