package com.digitalpebble.stormcrawler.elasticsearch.bolt;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.elasticsearch.BulkItemResponseToFailedFlag;
import com.digitalpebble.stormcrawler.elasticsearch.ElasticSearchConnection;
import com.digitalpebble.stormcrawler.indexing.AbstractIndexerBolt;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.digitalpebble.stormcrawler.util.PerSecondReducer;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.metric.api.MultiReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/bolt/IndexerBolt.class */
public class IndexerBolt extends AbstractIndexerBolt implements RemovalListener<String, List<Tuple>>, BulkProcessor.Listener {
    private static final Logger LOG = LoggerFactory.getLogger(IndexerBolt.class);
    private static final String ESBoltType = "indexer";
    static final String ESIndexNameParamName = "es.indexer.index.name";
    private static final String ESCreateParamName = "es.indexer.create";
    private static final String ESIndexPipelineParamName = "es.indexer.pipeline";
    private OutputCollector _collector;
    private String indexName;
    private String pipeline;
    private MultiCountMetric eventCounter;
    private ElasticSearchConnection connection;
    private MultiReducedMetric perSecMetrics;
    private Cache<String, List<Tuple>> waitAck;
    private boolean create = false;
    private final ReentrantLock waitAckLock = new ReentrantLock(true);

    public IndexerBolt() {
    }

    public IndexerBolt(String str) {
        this.indexName = str;
    }

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        this._collector = outputCollector;
        if (this.indexName == null) {
            this.indexName = ConfUtils.getString(map, ESIndexNameParamName, "content");
        }
        this.create = ConfUtils.getBoolean(map, ESCreateParamName, false);
        this.pipeline = ConfUtils.getString(map, ESIndexPipelineParamName);
        try {
            this.connection = ElasticSearchConnection.getConnection(map, ESBoltType, this);
            this.eventCounter = topologyContext.registerMetric("ElasticSearchIndexer", new MultiCountMetric(), 10);
            this.perSecMetrics = topologyContext.registerMetric("Indexer_average_persec", new MultiReducedMetric(new PerSecondReducer()), 10);
            this.waitAck = Caffeine.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).removalListener(this).build();
            topologyContext.registerMetric("waitAck", () -> {
                return Long.valueOf(this.waitAck.estimatedSize());
            }, 10);
        } catch (Exception e) {
            LOG.error("Can't connect to ElasticSearch", e);
            throw new RuntimeException(e);
        }
    }

    public void onRemoval(@Nullable String str, @Nullable List<Tuple> list, @NotNull RemovalCause removalCause) {
        if (removalCause.wasEvicted()) {
            if (list == null) {
                LOG.error("Purged from waitAck {} with no values", str);
                return;
            }
            LOG.error("Purged from waitAck {} with {} values", str, Integer.valueOf(list.size()));
            Iterator<Tuple> it = list.iterator();
            while (it.hasNext()) {
                this._collector.fail(it.next());
            }
        }
    }

    public void cleanup() {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    public void execute(Tuple tuple) {
        String stringByField = tuple.getStringByField("url");
        String valueForURL = valueForURL(tuple);
        LOG.info("Indexing {} as {}", stringByField, valueForURL);
        Metadata metadata = (Metadata) tuple.getValueByField("metadata");
        if (!filterDocument(metadata)) {
            LOG.info("Filtered {}", stringByField);
            this.eventCounter.scope("Filtered").incrBy(1L);
            this._collector.emit("status", tuple, new Values(new Object[]{stringByField, metadata, Status.FETCHED}));
            this._collector.ack(tuple);
            return;
        }
        String documentID = getDocumentID(metadata, valueForURL);
        try {
            XContentBuilder startObject = XContentFactory.jsonBuilder().startObject();
            if (StringUtils.isNotBlank(fieldNameForText())) {
                String trimText = trimText(tuple.getStringByField("text"));
                if (!ignoreEmptyFields() || StringUtils.isNotBlank(trimText)) {
                    startObject.field(fieldNameForText(), trimText(trimText));
                }
            }
            if (StringUtils.isNotBlank(fieldNameForURL())) {
                startObject.field(fieldNameForURL(), valueForURL);
            }
            Map filterMetadata = filterMetadata(metadata);
            for (String str : filterMetadata.keySet()) {
                String[] strArr = (String[]) filterMetadata.get(str);
                if (strArr.length == 1) {
                    if (!ignoreEmptyFields() || StringUtils.isNotBlank(strArr[0])) {
                        startObject.field(str, strArr[0]);
                    }
                } else if (strArr.length > 1) {
                    startObject.array(str, strArr);
                }
            }
            startObject.endObject();
            IndexRequest id = new IndexRequest(getIndexName(metadata)).source(startObject).id(documentID);
            DocWriteRequest.OpType opType = DocWriteRequest.OpType.INDEX;
            if (this.create) {
                opType = DocWriteRequest.OpType.CREATE;
            }
            id.opType(opType);
            if (this.pipeline != null) {
                id.setPipeline(this.pipeline);
            }
            this.connection.addToProcessor(id);
            this.eventCounter.scope("Indexed").incrBy(1L);
            this.perSecMetrics.scope("Indexed").update(1);
            this.waitAckLock.lock();
            try {
                List list = (List) this.waitAck.getIfPresent(documentID);
                if (list == null) {
                    list = new LinkedList();
                    this.waitAck.put(documentID, list);
                }
                list.add(tuple);
                LOG.debug("Added to waitAck {} with ID {} total {}", new Object[]{stringByField, documentID, Integer.valueOf(list.size())});
                this.waitAckLock.unlock();
            } finally {
            }
        } catch (IOException e) {
            LOG.error("Error building document for ES", e);
            this._collector.fail(tuple);
            this.waitAckLock.lock();
            try {
                this.waitAck.invalidate(documentID);
                this.waitAckLock.unlock();
            } finally {
            }
        }
    }

    protected String getIndexName(Metadata metadata) {
        return this.indexName;
    }

    public void beforeBulk(long j, BulkRequest bulkRequest) {
        this.eventCounter.scope("bulks_sent").incrBy(1L);
    }

    public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
        BulkItemResponseToFailedFlag bulkItemResponseToFailedFlag;
        this.eventCounter.scope("bulks_received").incrBy(1L);
        this.eventCounter.scope("bulk_msec").incrBy(bulkResponse.getTook().getMillis());
        Map map = (Map) Arrays.stream(bulkResponse.getItems()).map(bulkItemResponse -> {
            String id = bulkItemResponse.getId();
            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            boolean z = false;
            if (failure != null) {
                if (failure.getStatus().equals(RestStatus.CONFLICT)) {
                    this.eventCounter.scope("doc_conflicts").incrBy(1L);
                    LOG.debug("Doc conflict ID {}", id);
                } else {
                    z = true;
                }
            }
            return new BulkItemResponseToFailedFlag(bulkItemResponse, z);
        }).collect(Collectors.groupingBy(bulkItemResponseToFailedFlag2 -> {
            return bulkItemResponseToFailedFlag2.id;
        }, Collectors.toUnmodifiableList()));
        HashSet hashSet = null;
        this.waitAckLock.lock();
        try {
            Map allPresent = this.waitAck.getAllPresent(map.keySet());
            if (!allPresent.isEmpty()) {
                this.waitAck.invalidateAll(allPresent.keySet());
            }
            long estimatedSize = this.waitAck.estimatedSize();
            if (LOG.isDebugEnabled() && estimatedSize > 0) {
                hashSet = new HashSet(this.waitAck.asMap().keySet());
            }
            int i = 0;
            int i2 = 0;
            for (Map.Entry entry : allPresent.entrySet()) {
                String str = (String) entry.getKey();
                List<Tuple> list = (List) entry.getValue();
                List<BulkItemResponseToFailedFlag> list2 = (List) map.get(str);
                if (list2.size() == 1) {
                    bulkItemResponseToFailedFlag = (BulkItemResponseToFailedFlag) list2.get(0);
                } else {
                    BulkItemResponseToFailedFlag bulkItemResponseToFailedFlag3 = null;
                    int i3 = 0;
                    for (BulkItemResponseToFailedFlag bulkItemResponseToFailedFlag4 : list2) {
                        if (bulkItemResponseToFailedFlag3 == null) {
                            bulkItemResponseToFailedFlag3 = bulkItemResponseToFailedFlag4;
                        }
                        if (bulkItemResponseToFailedFlag4.failed) {
                            i3++;
                        } else {
                            bulkItemResponseToFailedFlag3 = bulkItemResponseToFailedFlag4;
                        }
                    }
                    if (i3 != list2.size()) {
                        LOG.warn("The id {} would result in an ack and a failure. Using only the ack for processing.", str);
                    }
                    bulkItemResponseToFailedFlag = (BulkItemResponseToFailedFlag) Objects.requireNonNull(bulkItemResponseToFailedFlag3);
                }
                if (list != null) {
                    LOG.debug("Found {} tuple(s) for ID {}", Integer.valueOf(list.size()), str);
                    for (Tuple tuple : list) {
                        String str2 = (String) tuple.getValueByField("url");
                        Metadata metadata = (Metadata) tuple.getValueByField("metadata");
                        if (bulkItemResponseToFailedFlag.failed) {
                            i2++;
                            BulkItemResponse.Failure failure = bulkItemResponseToFailedFlag.getFailure();
                            LOG.error("update ID {}, URL {}, failure: {}", new Object[]{str, str2, failure});
                            if (bulkItemResponseToFailedFlag.getFailure().getStatus().equals(RestStatus.BAD_REQUEST)) {
                                metadata.setValue("error.source", "ES indexing");
                                metadata.setValue("error.message", "invalid content");
                                this._collector.emit("status", tuple, new Values(new Object[]{str2, metadata, Status.ERROR}));
                                this._collector.ack(tuple);
                                LOG.debug("Acked {} with ID {}", str2, str);
                            } else {
                                LOG.error("update ID {}, URL {}, failure: {}", new Object[]{str, str2, failure});
                                if (failure.getStatus().equals(RestStatus.BAD_REQUEST)) {
                                    metadata.setValue("error.source", "ES indexing");
                                    metadata.setValue("error.message", "invalid content");
                                    this._collector.emit("status", tuple, new Values(new Object[]{str2, metadata, Status.ERROR}));
                                    this._collector.ack(tuple);
                                } else {
                                    this._collector.fail(tuple);
                                }
                            }
                        } else {
                            i++;
                            this._collector.emit("status", tuple, new Values(new Object[]{str2, metadata, Status.FETCHED}));
                            this._collector.ack(tuple);
                        }
                    }
                } else {
                    LOG.warn("Could not find unacked tuples for {}", entry.getKey());
                }
            }
            LOG.info("Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", new Object[]{Long.valueOf(j), Integer.valueOf(map.size()), Long.valueOf(estimatedSize), Integer.valueOf(i), Integer.valueOf(i2)});
            if (hashSet != null) {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    LOG.debug("Still in wait ack after bulk response [{}] => {}", Long.valueOf(j), (String) it.next());
                }
            }
        } finally {
            this.waitAckLock.unlock();
        }
    }

    public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
        this.eventCounter.scope("bulks_received").incrBy(1L);
        LOG.error("Exception with bulk {} - failing the whole lot ", Long.valueOf(j), th);
        Set<String> set = (Set) bulkRequest.requests().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toUnmodifiableSet());
        this.waitAckLock.lock();
        try {
            Map allPresent = this.waitAck.getAllPresent(set);
            if (!allPresent.isEmpty()) {
                this.waitAck.invalidateAll(allPresent.keySet());
            }
            for (String str : set) {
                List<Tuple> list = (List) allPresent.get(str);
                if (list != null) {
                    LOG.debug("Failed {} tuple(s) for ID {}", Integer.valueOf(list.size()), str);
                    for (Tuple tuple : list) {
                        this.eventCounter.scope("failed").incrBy(1L);
                        this._collector.fail(tuple);
                    }
                } else {
                    LOG.warn("Could not find unacked tuple for {}", str);
                }
            }
        } finally {
            this.waitAckLock.unlock();
        }
    }
}
