package com.digitalpebble.stormcrawler.elasticsearch.persistence;

import com.digitalpebble.stormcrawler.util.ConfUtils;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/persistence/CollapsingSpout.class */
public class CollapsingSpout extends AbstractSpout implements ActionListener<SearchResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(CollapsingSpout.class);
    private static final String ESMaxStartOffsetParamName = "es.status.max.start.offset";
    private int lastStartOffset = 0;
    private int maxStartOffset = -1;

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractSpout
    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.maxStartOffset = ConfUtils.getInt(map, ESMaxStartOffsetParamName, -1);
        super.open(map, topologyContext, spoutOutputCollector);
    }

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractSpout
    protected void populateBuffer() {
        if (this.queryDate == null) {
            this.queryDate = new Date();
            this.lastTimeResetToNOW = Instant.now();
            this.lastStartOffset = 0;
        } else if (this.maxStartOffset != -1 && this.lastStartOffset > this.maxStartOffset) {
            LOG.info("Reached max start offset {}", Integer.valueOf(this.lastStartOffset));
            this.lastStartOffset = 0;
        }
        String print = ISODateTimeFormat.dateTimeNoMillis().print(this.queryDate.getTime());
        LOG.info("{} Populating buffer with nextFetchDate <= {}", this.logIdprefix, print);
        BoolQueryBuilder filter = QueryBuilders.boolQuery().filter(QueryBuilders.rangeQuery("nextFetchDate").lte(print));
        if (this.filterQueries != null) {
            Iterator<String> it = this.filterQueries.iterator();
            while (it.hasNext()) {
                filter.filter(QueryBuilders.queryStringQuery(it.next()));
            }
        }
        SearchRequest searchRequest = new SearchRequest(new String[]{this.indexName});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(filter);
        searchSourceBuilder.from(this.lastStartOffset);
        searchSourceBuilder.size(this.maxBucketNum);
        searchSourceBuilder.explain(false);
        searchSourceBuilder.trackTotalHits(false);
        if (this.shardID != -1) {
            searchRequest.preference("_shards:" + this.shardID + "|_local");
        }
        if (this.queryTimeout != -1) {
            searchSourceBuilder.timeout(new TimeValue(this.queryTimeout, TimeUnit.SECONDS));
        }
        if (StringUtils.isNotBlank(this.totalSortField)) {
            searchSourceBuilder.sort(new FieldSortBuilder(this.totalSortField).order(SortOrder.ASC));
        }
        CollapseBuilder collapseBuilder = new CollapseBuilder(this.partitionField);
        if (this.maxURLsPerBucket > 1) {
            InnerHitBuilder innerHitBuilder = new InnerHitBuilder();
            innerHitBuilder.setSize(this.maxURLsPerBucket);
            innerHitBuilder.setName("urls_per_bucket");
            LinkedList linkedList = new LinkedList();
            Iterator<String> it2 = this.bucketSortField.iterator();
            while (it2.hasNext()) {
                linkedList.add(SortBuilders.fieldSort(it2.next()).order(SortOrder.ASC));
            }
            if (!linkedList.isEmpty()) {
                innerHitBuilder.setSorts(linkedList);
            }
            collapseBuilder.setInnerHits(innerHitBuilder);
        }
        searchSourceBuilder.collapse(collapseBuilder);
        searchRequest.source(searchSourceBuilder);
        LOG.debug("{} ES query {}", this.logIdprefix, searchRequest.toString());
        this.isInQuery.set(true);
        client.searchAsync(searchRequest, RequestOptions.DEFAULT, this);
    }

    public void onFailure(Exception exc) {
        LOG.error("{} Exception with ES query", this.logIdprefix, exc);
        markQueryReceivedNow();
    }

    public void onResponse(SearchResponse searchResponse) {
        long currentTimeMillis = System.currentTimeMillis() - getTimeLastQuerySent();
        SearchHit[] hits = searchResponse.getHits().getHits();
        int length = hits.length;
        int i = 0;
        int i2 = 0;
        for (SearchHit searchHit : hits) {
            Map innerHits = searchHit.getInnerHits();
            if (innerHits == null) {
                i2++;
                if (!addHitToBuffer(searchHit)) {
                    i++;
                }
            } else {
                for (SearchHit searchHit2 : ((SearchHits) innerHits.get("urls_per_bucket")).getHits()) {
                    i2++;
                    if (!addHitToBuffer(searchHit2)) {
                        i++;
                    }
                }
            }
        }
        this.queryTimes.addMeasurement(currentTimeMillis);
        this.eventCounter.scope("ES_queries").incrBy(1L);
        this.eventCounter.scope("ES_docs").incrBy(i2);
        this.eventCounter.scope("already_being_processed").incrBy(i);
        LOG.info("{} ES query returned {} hits from {} buckets in {} msec with {} already being processed.Took {} msec per doc on average.", new Object[]{this.logIdprefix, Integer.valueOf(i2), Integer.valueOf(length), Long.valueOf(currentTimeMillis), Integer.valueOf(i), Float.valueOf(((float) currentTimeMillis) / i2)});
        if (this.resetFetchDateAfterNSecs != -1 && Instant.now().isAfter(Instant.ofEpochMilli(this.lastTimeResetToNOW.toEpochMilli() + (this.resetFetchDateAfterNSecs * 1000)))) {
            LOG.info("queryDate reset based on resetFetchDateAfterNSecs {}", Integer.valueOf(this.resetFetchDateAfterNSecs));
            this.queryDate = null;
            this.lastStartOffset = 0;
        }
        if (length == 0) {
            this.queryDate = null;
            this.lastStartOffset = 0;
        } else if (length < this.maxBucketNum) {
            this.lastStartOffset = 0;
        } else {
            this.lastStartOffset += length;
        }
        markQueryReceivedNow();
    }
}
