package com.digitalpebble.stormcrawler.elasticsearch.persistence;

import com.digitalpebble.stormcrawler.util.ConfUtils;
import java.time.Instant;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/persistence/AggregationSpout.class */
public class AggregationSpout extends AbstractSpout implements ActionListener<SearchResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(AggregationSpout.class);
    private static final String ESStatusSampleParamName = "es.status.sample";
    private static final String ESMostRecentDateIncreaseParamName = "es.status.recentDate.increase";
    private static final String ESMostRecentDateMinGapParamName = "es.status.recentDate.min.gap";
    private boolean sample = false;
    private int recentDateIncrease = -1;
    private int recentDateMinGap = -1;
    protected Set<String> currentBuckets;

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractSpout
    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.sample = ConfUtils.getBoolean(map, ESStatusSampleParamName, this.sample);
        this.recentDateIncrease = ConfUtils.getInt(map, ESMostRecentDateIncreaseParamName, this.recentDateIncrease);
        this.recentDateMinGap = ConfUtils.getInt(map, ESMostRecentDateMinGapParamName, this.recentDateMinGap);
        super.open(map, topologyContext, spoutOutputCollector);
        this.currentBuckets = new HashSet();
    }

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractSpout
    protected void populateBuffer() {
        if (this.queryDate == null) {
            this.queryDate = new Date();
            this.lastTimeResetToNOW = Instant.now();
        }
        String print = ISODateTimeFormat.dateTimeNoMillis().print(this.queryDate.getTime());
        LOG.info("{} Populating buffer with nextFetchDate <= {}", this.logIdprefix, print);
        BoolQueryBuilder filter = QueryBuilders.boolQuery().filter(QueryBuilders.rangeQuery("nextFetchDate").lte(print));
        if (this.filterQueries != null) {
            Iterator<String> it = this.filterQueries.iterator();
            while (it.hasNext()) {
                filter.filter(QueryBuilders.queryStringQuery(it.next()));
            }
        }
        SearchRequest searchRequest = new SearchRequest(new String[]{this.indexName});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(filter);
        searchSourceBuilder.from(0);
        searchSourceBuilder.size(0);
        searchSourceBuilder.explain(false);
        searchSourceBuilder.trackTotalHits(false);
        if (this.queryTimeout != -1) {
            searchSourceBuilder.timeout(new TimeValue(this.queryTimeout, TimeUnit.SECONDS));
        }
        TermsAggregationBuilder size = AggregationBuilders.terms("partition").field(this.partitionField).size(this.maxBucketNum);
        TopHitsAggregationBuilder explain = AggregationBuilders.topHits("docs").size(this.maxURLsPerBucket).explain(false);
        Iterator<String> it2 = this.bucketSortField.iterator();
        while (it2.hasNext()) {
            explain.sort(SortBuilders.fieldSort(it2.next()).order(SortOrder.ASC));
        }
        size.subAggregation(explain);
        if (StringUtils.isNotBlank(this.totalSortField)) {
            size.subAggregation(AggregationBuilders.min("top_hit").field(this.totalSortField));
            size.order(BucketOrder.aggregation("top_hit", true));
        }
        if (this.sample) {
            DiversifiedAggregationBuilder diversifiedAggregationBuilder = new DiversifiedAggregationBuilder("sample");
            diversifiedAggregationBuilder.field(this.partitionField).maxDocsPerValue(this.maxURLsPerBucket);
            diversifiedAggregationBuilder.shardSize(this.maxURLsPerBucket * this.maxBucketNum);
            diversifiedAggregationBuilder.subAggregation(size);
            searchSourceBuilder.aggregation(diversifiedAggregationBuilder);
        } else {
            searchSourceBuilder.aggregation(size);
        }
        searchRequest.source(searchSourceBuilder);
        if (this.shardID != -1) {
            searchRequest.preference("_shards:" + this.shardID + "|_local");
        }
        LOG.debug("{} ES query {}", this.logIdprefix, searchRequest);
        LOG.trace("{} isInquery set to true");
        this.isInQuery.set(true);
        client.searchAsync(searchRequest, RequestOptions.DEFAULT, this);
    }

    public void onFailure(Exception exc) {
        LOG.error("{} Exception with ES query", this.logIdprefix, exc);
        markQueryReceivedNow();
    }

    @Override // 
    public void onResponse(SearchResponse searchResponse) {
        long currentTimeMillis = System.currentTimeMillis() - getTimeLastQuerySent();
        Aggregations aggregations = searchResponse.getAggregations();
        if (aggregations == null) {
            markQueryReceivedNow();
            return;
        }
        SingleBucketAggregation singleBucketAggregation = aggregations.get("sample");
        if (singleBucketAggregation != null) {
            aggregations = singleBucketAggregation.getAggregations();
        }
        Terms terms = aggregations.get("partition");
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        Instant instant = null;
        this.currentBuckets.clear();
        Iterator it = terms.getBuckets().iterator();
        while (it.hasNext()) {
            Terms.Bucket bucket = (Terms.Bucket) it.next();
            String str = (String) bucket.getKey();
            this.currentBuckets.add(str);
            long docCount = bucket.getDocCount();
            int i4 = 0;
            SearchHit searchHit = null;
            for (SearchHit searchHit2 : bucket.getAggregations().get("docs").getHits().getHits()) {
                LOG.debug("{} -> id [{}], _source [{}]", new Object[]{this.logIdprefix, searchHit2.getId(), searchHit2.getSourceAsString()});
                i4++;
                searchHit = searchHit2;
                Map<String, Object> sourceAsMap = searchHit2.getSourceAsMap();
                String str2 = (String) sourceAsMap.get("url");
                if (i4 == 1 && !it.hasNext()) {
                    String str3 = (String) sourceAsMap.get("nextFetchDate");
                    try {
                        instant = Instant.parse(str3);
                    } catch (Exception e) {
                        throw new RuntimeException("can't parse date :" + str3);
                    }
                }
                if (this.beingProcessed.containsKey(str2)) {
                    LOG.debug("{} -> already processed: {}", this.logIdprefix, str2);
                    i3++;
                } else if (this.buffer.add(str2, fromKeyValues(sourceAsMap))) {
                    LOG.debug("{} -> added to buffer : {}", this.logIdprefix, str2);
                } else {
                    LOG.debug("{} -> already in buffer: {}", this.logIdprefix, str2);
                    i3++;
                }
            }
            if (searchHit != null) {
                sortValuesForKey(str, searchHit.getSortValues());
            }
            if (i4 > 0) {
                i2++;
            }
            i += i4;
            LOG.debug("{} key [{}], hits[{}], doc_count [{}]", new Object[]{this.logIdprefix, str, Integer.valueOf(i4), Long.valueOf(docCount), Integer.valueOf(i3)});
        }
        LOG.info("{} ES query returned {} hits from {} buckets in {} msec with {} already being processed. Took {} msec per doc on average.", new Object[]{this.logIdprefix, Integer.valueOf(i), Integer.valueOf(i2), Long.valueOf(currentTimeMillis), Integer.valueOf(i3), Float.valueOf(((float) currentTimeMillis) / i)});
        this.queryTimes.addMeasurement(currentTimeMillis);
        this.eventCounter.scope("already_being_processed").incrBy(i3);
        this.eventCounter.scope("ES_queries").incrBy(1L);
        this.eventCounter.scope("ES_docs").incrBy(i);
        if (instant != null && this.recentDateIncrease >= 0) {
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(instant.toEpochMilli());
            calendar.add(12, this.recentDateIncrease);
            Date date = null;
            if (this.recentDateMinGap > 0) {
                Calendar calendar2 = Calendar.getInstance();
                calendar2.setTime(this.queryDate);
                calendar2.add(12, -this.recentDateMinGap);
                Calendar calendar3 = Calendar.getInstance();
                calendar3.setTime(this.queryDate);
                calendar3.add(12, this.recentDateMinGap);
                if (calendar3.before(calendar) || calendar2.after(calendar)) {
                    date = this.queryDate;
                }
            } else {
                date = this.queryDate;
            }
            if (date != null) {
                this.queryDate = calendar.getTime();
                LOG.info("{} queryDate changed from {} to {} based on mostRecentDateFound {}", new Object[]{this.logIdprefix, date, this.queryDate, instant});
            } else {
                LOG.info("{} queryDate kept at {} based on mostRecentDateFound {}", new Object[]{this.logIdprefix, this.queryDate, instant});
            }
        }
        if (this.resetFetchDateAfterNSecs != -1 && Instant.now().isAfter(Instant.ofEpochMilli(this.lastTimeResetToNOW.toEpochMilli() + (this.resetFetchDateAfterNSecs * 1000)))) {
            LOG.info("{} queryDate set to null based on resetFetchDateAfterNSecs {}", this.logIdprefix, Integer.valueOf(this.resetFetchDateAfterNSecs));
            this.queryDate = null;
        }
        if (i2 == 0) {
            this.queryDate = null;
        }
        markQueryReceivedNow();
    }

    protected void sortValuesForKey(String str, Object[] objArr) {
    }
}
