package com.digitalpebble.stormcrawler.elasticsearch.persistence;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.elasticsearch.ElasticSearchConnection;
import com.digitalpebble.stormcrawler.persistence.AbstractQueryingSpout;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import java.io.IOException;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/persistence/AbstractSpout.class */
public abstract class AbstractSpout extends AbstractQueryingSpout {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSpout.class);
    protected static final String ESBoltType = "status";
    protected static final String ESStatusIndexNameParamName = "es.status.index.name";
    protected static final String ESStatusBucketFieldParamName = "es.status.bucket.field";
    protected static final String ESStatusMaxBucketParamName = "es.status.max.buckets";
    protected static final String ESStatusMaxURLsParamName = "es.status.max.urls.per.bucket";
    protected static final String ESStatusBucketSortFieldParamName = "es.status.bucket.sort.field";
    protected static final String ESStatusGlobalSortFieldParamName = "es.status.global.sort.field";
    protected static final String ESStatusFilterParamName = "es.status.filterQuery";
    protected static final String ESStatusQueryTimeoutParamName = "es.status.query.timeout";
    protected String indexName;
    protected static RestHighLevelClient client;
    protected String partitionField;
    protected Date queryDate;
    protected List<String> filterQueries = null;
    protected int shardID = -1;
    protected String logIdprefix = "";
    protected int maxURLsPerBucket = 10;
    protected int maxBucketNum = 10;
    protected List<String> bucketSortField = new LinkedList();
    protected String totalSortField = "";
    protected int queryTimeout = -1;

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        super.open(map, topologyContext, spoutOutputCollector);
        this.indexName = ConfUtils.getString(map, ESStatusIndexNameParamName, ESBoltType);
        synchronized (AbstractSpout.class) {
            try {
                if (client == null) {
                    client = ElasticSearchConnection.getClient(map, ESBoltType);
                }
            } catch (Exception e) {
                LOG.error("Can't connect to ElasticSearch", e);
                throw new RuntimeException(e);
            }
        }
        if (topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size() > 1) {
            this.logIdprefix = "[" + topologyContext.getThisComponentId() + " #" + topologyContext.getThisTaskIndex() + "] ";
            this.shardID = topologyContext.getThisTaskIndex();
            LOG.info("{} assigned shard ID {}", this.logIdprefix, Integer.valueOf(this.shardID));
        }
        this.partitionField = ConfUtils.getString(map, ESStatusBucketFieldParamName, "key");
        this.bucketSortField = ConfUtils.loadListFromConf(ESStatusBucketSortFieldParamName, map);
        this.totalSortField = ConfUtils.getString(map, ESStatusGlobalSortFieldParamName);
        this.maxURLsPerBucket = ConfUtils.getInt(map, ESStatusMaxURLsParamName, 1);
        this.maxBucketNum = ConfUtils.getInt(map, ESStatusMaxBucketParamName, 10);
        this.queryTimeout = ConfUtils.getInt(map, ESStatusQueryTimeoutParamName, -1);
        this.filterQueries = ConfUtils.loadListFromConf(ESStatusFilterParamName, map);
    }

    protected abstract void populateBuffer();

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean addHitToBuffer(SearchHit searchHit) {
        Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
        String str = (String) sourceAsMap.get("url");
        if (this.beingProcessed.containsKey(str)) {
            return false;
        }
        Metadata fromKeyValues = fromKeyValues(sourceAsMap);
        addHitInfoToMetadata(fromKeyValues, searchHit);
        return this.buffer.add(str, fromKeyValues);
    }

    protected void addHitInfoToMetadata(Metadata metadata, SearchHit searchHit) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Metadata fromKeyValues(Map<String, Object> map) {
        Map map2 = (Map) map.get("metadata");
        Metadata metadata = new Metadata();
        if (map2 != null) {
            for (Map.Entry entry : map2.entrySet()) {
                String replaceAll = ((String) entry.getKey()).replaceAll("%2E", "\\.");
                Object value = entry.getValue();
                if (value instanceof String) {
                    metadata.addValue(replaceAll, (String) value);
                } else {
                    metadata.addValues(replaceAll, (List) value);
                }
            }
        }
        return metadata;
    }

    public void ack(Object obj) {
        LOG.debug("{}  Ack for {}", this.logIdprefix, obj);
        super.ack(obj);
    }

    public void fail(Object obj) {
        LOG.info("{}  Fail for {}", this.logIdprefix, obj);
        super.fail(obj);
    }

    public void close() {
        if (client != null) {
            try {
                client.close();
            } catch (IOException e) {
            }
        }
    }
}
