package com.digitalpebble.stormcrawler.elasticsearch.persistence;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.persistence.Status;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/persistence/ScrollSpout.class */
public class ScrollSpout extends AbstractSpout implements ActionListener<SearchResponse> {
    private String scrollId = null;
    private boolean hasFinished = false;
    private Queue<Values> queue = new LinkedList();
    private static final Logger LOG = LoggerFactory.getLogger(ScrollSpout.class);

    public void nextTuple() {
        synchronized (this.queue) {
            if (this.queue.isEmpty()) {
                if (!this.isInQuery.get()) {
                    populateBuffer();
                    return;
                } else {
                    LOG.trace("{} isInquery true", this.logIdprefix);
                    Utils.sleep(10L);
                    return;
                }
            }
            List remove = this.queue.remove();
            String obj = remove.get(0).toString();
            this._collector.emit("status", remove, obj);
            this.beingProcessed.put(obj, remove);
            this.eventCounter.scope("emitted").incrBy(1L);
            LOG.debug("{} emitted {}", this.logIdprefix, obj);
        }
    }

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractSpout
    protected void populateBuffer() {
        if (this.hasFinished) {
            Utils.sleep(10L);
            return;
        }
        if (this.scrollId != null) {
            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(this.scrollId);
            searchScrollRequest.scroll(TimeValue.timeValueMinutes(5L));
            this.isInQuery.set(true);
            client.scrollAsync(searchScrollRequest, RequestOptions.DEFAULT, this);
            LOG.debug("{} ES query {}", this.logIdprefix, searchScrollRequest.toString());
            return;
        }
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        searchSourceBuilder.size(this.maxURLsPerBucket * this.maxBucketNum);
        SearchRequest searchRequest = new SearchRequest(new String[]{this.indexName});
        searchRequest.source(searchSourceBuilder);
        searchRequest.scroll(TimeValue.timeValueMinutes(5L));
        if (this.shardID != -1) {
            searchRequest.preference("_shards:" + this.shardID + "|_local");
        }
        this.isInQuery.set(true);
        LOG.trace("{} isInquery set to true", this.logIdprefix);
        client.searchAsync(searchRequest, RequestOptions.DEFAULT, this);
        LOG.debug("{} ES query {}", this.logIdprefix, searchRequest.toString());
    }

    public void onResponse(SearchResponse searchResponse) {
        SearchHits hits = searchResponse.getHits();
        LOG.info("{} ES query returned {} hits in {} msec", new Object[]{this.logIdprefix, Integer.valueOf(hits.getHits().length), Long.valueOf(searchResponse.getTook().getMillis())});
        this.hasFinished = hits.getHits().length == 0;
        synchronized (this.queue) {
            Iterator it = hits.iterator();
            while (it.hasNext()) {
                Map<String, Object> sourceAsMap = ((SearchHit) it.next()).getSourceAsMap();
                String str = (String) sourceAsMap.get("url");
                String str2 = (String) sourceAsMap.get("status");
                String str3 = (String) sourceAsMap.get("nextFetchDate");
                Metadata fromKeyValues = fromKeyValues(sourceAsMap);
                fromKeyValues.setValue("status.store.as.is.with.nextfetchdate", str3);
                this.queue.add(new Values(new Object[]{str, fromKeyValues, Status.valueOf(str2)}));
            }
        }
        this.scrollId = searchResponse.getScrollId();
        markQueryReceivedNow();
    }

    public void onFailure(Exception exc) {
        LOG.error("{} Exception with ES query", this.logIdprefix, exc);
        markQueryReceivedNow();
    }

    @Override // com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractSpout
    public void fail(Object obj) {
        LOG.info("{}  Fail for {}", this.logIdprefix, obj);
        this.eventCounter.scope("failed").incrBy(1L);
        this.queue.add((Values) this.beingProcessed.remove(obj));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("status", new Fields(new String[]{"url", "metadata", "status"}));
    }
}
