package com.digitalpebble.stormcrawler.elasticsearch.persistence;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.elasticsearch.BulkItemResponseToFailedFlag;
import com.digitalpebble.stormcrawler.elasticsearch.ElasticSearchConnection;
import com.digitalpebble.stormcrawler.persistence.AbstractStatusUpdaterBolt;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.digitalpebble.stormcrawler.util.URLPartitioner;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/persistence/StatusUpdaterBolt.class */
public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt implements RemovalListener<String, List<Tuple>>, BulkProcessor.Listener {
    private static final Logger LOG = LoggerFactory.getLogger(StatusUpdaterBolt.class);
    private String ESBoltType;
    private static final String ESStatusIndexNameParamName = "es.%s.index.name";
    private static final String ESStatusRoutingParamName = "es.%s.routing";
    private static final String ESStatusRoutingFieldParamName = "es.%s.routing.fieldname";
    private boolean routingFieldNameInMetadata;
    private String indexName;
    private URLPartitioner partitioner;
    private boolean doRouting;
    private String fieldNameForRoutingKey;
    private ElasticSearchConnection connection;
    private Cache<String, List<Tuple>> waitAck;
    private final ReentrantLock waitAckLock;
    private MultiCountMetric eventCounter;

    public StatusUpdaterBolt() {
        this.ESBoltType = "status";
        this.routingFieldNameInMetadata = false;
        this.fieldNameForRoutingKey = null;
        this.waitAckLock = new ReentrantLock(true);
    }

    public StatusUpdaterBolt(String str) {
        this.ESBoltType = "status";
        this.routingFieldNameInMetadata = false;
        this.fieldNameForRoutingKey = null;
        this.waitAckLock = new ReentrantLock(true);
        this.ESBoltType = str;
    }

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        this.indexName = ConfUtils.getString(map, String.format(ESStatusIndexNameParamName, this.ESBoltType), "status");
        this.doRouting = ConfUtils.getBoolean(map, String.format(ESStatusRoutingParamName, this.ESBoltType), false);
        this.partitioner = new URLPartitioner();
        this.partitioner.configure(map);
        this.fieldNameForRoutingKey = ConfUtils.getString(map, String.format(ESStatusRoutingFieldParamName, this.ESBoltType));
        if (StringUtils.isNotBlank(this.fieldNameForRoutingKey)) {
            if (this.fieldNameForRoutingKey.startsWith("metadata.")) {
                this.routingFieldNameInMetadata = true;
                this.fieldNameForRoutingKey = this.fieldNameForRoutingKey.substring("metadata.".length());
            }
            this.fieldNameForRoutingKey = this.fieldNameForRoutingKey.replaceAll("\\.", "%2E");
        }
        this.waitAck = Caffeine.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).removalListener(this).build();
        topologyContext.registerMetric("waitAck", () -> {
            return Long.valueOf(this.waitAck.estimatedSize());
        }, 10);
        try {
            this.connection = ElasticSearchConnection.getConnection(map, this.ESBoltType, this);
            this.eventCounter = topologyContext.registerMetric("counters", new MultiCountMetric(), 30);
        } catch (Exception e) {
            LOG.error("Can't connect to ElasticSearch", e);
            throw new RuntimeException(e);
        }
    }

    public void cleanup() {
        if (this.connection == null) {
            return;
        }
        this.connection.close();
        this.connection = null;
    }

    public void store(String str, Status status, Metadata metadata, Optional<Date> optional, Tuple tuple) throws Exception {
        String documentID = getDocumentID(metadata, str);
        this.waitAckLock.lock();
        try {
            if (status.equals(Status.DISCOVERED) && ((List) this.waitAck.getIfPresent(documentID)) != null) {
                LOG.debug("Already being sent to ES {} with status {} and ID {}", new Object[]{str, status, documentID});
                this.eventCounter.scope("acked").incrBy(1L);
                super.ack(tuple, str);
                return;
            }
            XContentBuilder startObject = XContentFactory.jsonBuilder().startObject();
            startObject.field("url", str);
            startObject.field("status", status);
            startObject.startObject("metadata");
            for (String str2 : metadata.keySet()) {
                startObject.array(str2.replaceAll("\\.", "%2E"), metadata.getValues(str2));
            }
            String partition = this.partitioner.getPartition(str, metadata);
            if (partition == null) {
                partition = "_DEFAULT_";
            }
            if (StringUtils.isNotBlank(this.fieldNameForRoutingKey) && this.routingFieldNameInMetadata) {
                startObject.field(this.fieldNameForRoutingKey, partition);
            }
            startObject.endObject();
            if (StringUtils.isNotBlank(this.fieldNameForRoutingKey) && !this.routingFieldNameInMetadata) {
                startObject.field(this.fieldNameForRoutingKey, partition);
            }
            if (optional.isPresent()) {
                startObject.timeField("nextFetchDate", optional.get());
            }
            startObject.endObject();
            IndexRequest indexRequest = new IndexRequest(getIndexName(metadata));
            indexRequest.source(startObject).id(documentID).create(status.equals(Status.DISCOVERED));
            if (this.doRouting) {
                indexRequest.routing(partition);
            }
            this.waitAckLock.lock();
            try {
                List list = (List) this.waitAck.get(documentID, str3 -> {
                    return new LinkedList();
                });
                list.add(tuple);
                LOG.debug("Added to waitAck {} with ID {} total {}", new Object[]{str, documentID, Integer.valueOf(list.size())});
                this.waitAckLock.unlock();
                LOG.debug("Sending to ES buffer {} with ID {}", str, documentID);
                this.connection.addToProcessor(indexRequest);
            } finally {
                this.waitAckLock.unlock();
            }
        } finally {
        }
    }

    public void onRemoval(@Nullable String str, @Nullable List<Tuple> list, @NotNull RemovalCause removalCause) {
        if (removalCause.wasEvicted()) {
            LOG.error("Purged from waitAck {} with {} values", str, Integer.valueOf(list.size()));
            for (Tuple tuple : list) {
                this.eventCounter.scope("failed").incrBy(1L);
                this._collector.fail(tuple);
            }
        }
    }

    public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
        BulkItemResponseToFailedFlag bulkItemResponseToFailedFlag;
        LOG.debug("afterBulk [{}] with {} responses", Long.valueOf(j), Integer.valueOf(bulkRequest.numberOfActions()));
        this.eventCounter.scope("bulks_received").incrBy(1L);
        this.eventCounter.scope("bulk_msec").incrBy(bulkResponse.getTook().getMillis());
        Map map = (Map) Arrays.stream(bulkResponse.getItems()).map(bulkItemResponse -> {
            String id = bulkItemResponse.getId();
            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            boolean z = false;
            if (failure != null) {
                if (failure.getStatus().equals(RestStatus.CONFLICT)) {
                    this.eventCounter.scope("doc_conflicts").incrBy(1L);
                    LOG.debug("Doc conflict ID {}", id);
                } else {
                    LOG.error("Update ID {}, failure: {}", id, failure);
                    z = true;
                }
            }
            return new BulkItemResponseToFailedFlag(bulkItemResponse, z);
        }).collect(Collectors.groupingBy(bulkItemResponseToFailedFlag2 -> {
            return bulkItemResponseToFailedFlag2.id;
        }, Collectors.toUnmodifiableList()));
        HashSet hashSet = null;
        this.waitAckLock.lock();
        try {
            Map allPresent = this.waitAck.getAllPresent(map.keySet());
            if (!allPresent.isEmpty()) {
                this.waitAck.invalidateAll(allPresent.keySet());
            }
            long estimatedSize = this.waitAck.estimatedSize();
            if (LOG.isDebugEnabled() && estimatedSize > 0) {
                hashSet = new HashSet(this.waitAck.asMap().keySet());
            }
            int i = 0;
            int i2 = 0;
            for (Map.Entry entry : allPresent.entrySet()) {
                String str = (String) entry.getKey();
                List<Tuple> list = (List) entry.getValue();
                List<BulkItemResponseToFailedFlag> list2 = (List) map.get(str);
                if (list2.size() == 1) {
                    bulkItemResponseToFailedFlag = (BulkItemResponseToFailedFlag) list2.get(0);
                } else {
                    BulkItemResponseToFailedFlag bulkItemResponseToFailedFlag3 = null;
                    int i3 = 0;
                    for (BulkItemResponseToFailedFlag bulkItemResponseToFailedFlag4 : list2) {
                        if (bulkItemResponseToFailedFlag3 == null) {
                            bulkItemResponseToFailedFlag3 = bulkItemResponseToFailedFlag4;
                        }
                        if (bulkItemResponseToFailedFlag4.failed) {
                            i3++;
                        } else {
                            bulkItemResponseToFailedFlag3 = bulkItemResponseToFailedFlag4;
                        }
                    }
                    if (i3 != list2.size()) {
                        LOG.warn("The id {} would result in an ack and a failure. Using only the ack for processing.", str);
                    }
                    bulkItemResponseToFailedFlag = (BulkItemResponseToFailedFlag) Objects.requireNonNull(bulkItemResponseToFailedFlag3);
                }
                if (list != null) {
                    LOG.debug("Acked {} tuple(s) for ID {}", Integer.valueOf(list.size()), str);
                    for (Tuple tuple : list) {
                        if (bulkItemResponseToFailedFlag.failed) {
                            i2++;
                            this.eventCounter.scope("failed").incrBy(1L);
                            this._collector.fail(tuple);
                        } else {
                            String stringByField = tuple.getStringByField("url");
                            i++;
                            LOG.debug("Acked {} with ID {}", stringByField, str);
                            this.eventCounter.scope("acked").incrBy(1L);
                            super.ack(tuple, stringByField);
                        }
                    }
                } else {
                    LOG.warn("Could not find unacked tuple for {}", str);
                }
            }
            LOG.info("Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", new Object[]{Long.valueOf(j), Integer.valueOf(map.size()), Long.valueOf(estimatedSize), Integer.valueOf(i), Integer.valueOf(i2)});
            if (hashSet != null) {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    LOG.debug("Still in wait ack after bulk response [{}] => {}", Long.valueOf(j), (String) it.next());
                }
            }
        } finally {
            this.waitAckLock.unlock();
        }
    }

    public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
        this.eventCounter.scope("bulks_received").incrBy(1L);
        LOG.error("Exception with bulk {} - failing the whole lot ", Long.valueOf(j), th);
        Set<String> set = (Set) bulkRequest.requests().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toUnmodifiableSet());
        this.waitAckLock.lock();
        try {
            Map allPresent = this.waitAck.getAllPresent(set);
            if (!allPresent.isEmpty()) {
                this.waitAck.invalidateAll(allPresent.keySet());
            }
            for (String str : set) {
                List<Tuple> list = (List) allPresent.get(str);
                if (list != null) {
                    LOG.debug("Failed {} tuple(s) for ID {}", Integer.valueOf(list.size()), str);
                    for (Tuple tuple : list) {
                        this.eventCounter.scope("failed").incrBy(1L);
                        this._collector.fail(tuple);
                    }
                } else {
                    LOG.warn("Could not find unacked tuple for {}", str);
                }
            }
        } finally {
            this.waitAckLock.unlock();
        }
    }

    public void beforeBulk(long j, BulkRequest bulkRequest) {
        LOG.debug("beforeBulk {} with {} actions", Long.valueOf(j), Integer.valueOf(bulkRequest.numberOfActions()));
        this.eventCounter.scope("bulks_received").incrBy(1L);
    }

    protected String getIndexName(Metadata metadata) {
        return this.indexName;
    }
}
