package com.digitalpebble.stormcrawler.elasticsearch.persistence;

import com.digitalpebble.stormcrawler.persistence.EmptyQueueListener;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/persistence/HybridSpout.class */
public class HybridSpout extends AggregationSpout implements EmptyQueueListener {
    private static final Logger LOG = LoggerFactory.getLogger(HybridSpout.class);
    protected static final String RELOADPARAMNAME = "es.status.max.urls.per.reload";
    private int bufferReloadSize = 10;
    private Cache<String, Object[]> searchAfterCache;
    private HostResultListener hrl;

    /* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/persistence/HybridSpout$HostResultListener.class */
    class HostResultListener implements ActionListener<SearchResponse> {
        HostResultListener() {
        }

        public void onResponse(SearchResponse searchResponse) {
            int i = 0;
            int i2 = 0;
            Object[] objArr = null;
            String str = null;
            for (SearchHit searchHit : searchResponse.getHits().getHits()) {
                i2++;
                String str2 = HybridSpout.this.partitionField;
                Map sourceAsMap = searchHit.getSourceAsMap();
                if (str2.startsWith("metadata.")) {
                    sourceAsMap = (Map) sourceAsMap.get("metadata");
                    str2 = str2.substring(9);
                }
                Object obj = sourceAsMap.get(str2);
                if (!(obj instanceof List)) {
                    str = obj.toString();
                } else if (((List) obj).size() == 1) {
                    str = (String) ((List) obj).get(0);
                }
                objArr = searchHit.getSortValues();
                if (!HybridSpout.this.addHitToBuffer(searchHit)) {
                    i++;
                }
            }
            if (str != null) {
                HybridSpout.this.searchAfterCache.put(str, objArr);
            }
            HybridSpout.this.eventCounter.scope("ES_queries_host").incrBy(1L);
            HybridSpout.this.eventCounter.scope("ES_docs_host").incrBy(i2);
            HybridSpout.this.eventCounter.scope("already_being_processed_host").incrBy(i);
            HybridSpout.LOG.info("{} ES term query returned {} hits  in {} msec with {} already being processed for {}", new Object[]{HybridSpout.this.logIdprefix, Integer.valueOf(i2), Long.valueOf(searchResponse.getTook().getMillis()), Integer.valueOf(i), str});
        }

        public void onFailure(Exception exc) {
            HybridSpout.LOG.error("Exception with ES query", exc);
        }
    }

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout, com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractSpout
    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        super.open(map, topologyContext, spoutOutputCollector);
        this.bufferReloadSize = ConfUtils.getInt(map, RELOADPARAMNAME, this.maxURLsPerBucket);
        this.buffer.setEmptyQueueListener(this);
        this.searchAfterCache = Caffeine.newBuilder().build();
        this.hrl = new HostResultListener();
    }

    public void emptyQueue(String str) {
        LOG.info("{} Emptied buffer queue for {}", this.logIdprefix, str);
        if (this.currentBuckets.contains(str)) {
            if (this.isInQuery.get()) {
                LOG.trace("{} isInquery true", this.logIdprefix, str);
                return;
            }
            LOG.info("{} Querying for more docs for {}", this.logIdprefix, str);
            if (this.queryDate == null) {
                this.queryDate = new Date();
                this.lastTimeResetToNOW = Instant.now();
            }
            BoolQueryBuilder filter = QueryBuilders.boolQuery().filter(QueryBuilders.rangeQuery("nextFetchDate").lte(ISODateTimeFormat.dateTimeNoMillis().print(this.queryDate.getTime())));
            filter.filter(QueryBuilders.termQuery(this.partitionField, str));
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(filter);
            searchSourceBuilder.from(0);
            searchSourceBuilder.size(this.bufferReloadSize);
            searchSourceBuilder.explain(false);
            searchSourceBuilder.trackTotalHits(false);
            Iterator<String> it = this.bucketSortField.iterator();
            while (it.hasNext()) {
                searchSourceBuilder.sort(SortBuilders.fieldSort(it.next()).order(SortOrder.ASC));
            }
            Object[] objArr = (Object[]) this.searchAfterCache.getIfPresent(str);
            if (objArr != null) {
                searchSourceBuilder.searchAfter(objArr);
            }
            SearchRequest searchRequest = new SearchRequest(new String[]{this.indexName});
            searchRequest.source(searchSourceBuilder);
            if (this.shardID != -1) {
                searchRequest.preference("_shards:" + this.shardID + "|_local");
            }
            LOG.debug("{} ES query {} - {}", new Object[]{this.logIdprefix, str, searchRequest.toString()});
            client.searchAsync(searchRequest, RequestOptions.DEFAULT, this.hrl);
        }
    }

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout
    public void onResponse(SearchResponse searchResponse) {
        this.searchAfterCache.invalidateAll();
        super.onResponse(searchResponse);
    }

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout
    protected void sortValuesForKey(String str, Object[] objArr) {
        if (objArr == null || objArr.length <= 0) {
            return;
        }
        this.searchAfterCache.put(str, objArr);
    }
}
