package com.digitalpebble.stormcrawler.elasticsearch;

import com.digitalpebble.stormcrawler.util.ConfUtils;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.core.TimeValue;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/elasticsearch/ElasticSearchConnection.class */
public final class ElasticSearchConnection {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchConnection.class);

    @NotNull
    private final RestHighLevelClient client;

    @NotNull
    private final BulkProcessor processor;

    @Nullable
    private final Sniffer sniffer;
    private boolean isClosed;

    private ElasticSearchConnection(@NotNull RestHighLevelClient restHighLevelClient, @NotNull BulkProcessor bulkProcessor) {
        this(restHighLevelClient, bulkProcessor, null);
    }

    private ElasticSearchConnection(@NotNull RestHighLevelClient restHighLevelClient, @NotNull BulkProcessor bulkProcessor, @Nullable Sniffer sniffer) {
        this.isClosed = false;
        this.processor = bulkProcessor;
        this.client = restHighLevelClient;
        this.sniffer = sniffer;
    }

    public RestHighLevelClient getClient() {
        return this.client;
    }

    public void addToProcessor(IndexRequest indexRequest) {
        this.processor.add(indexRequest);
    }

    public static RestHighLevelClient getClient(Map<String, Object> map, String str) {
        List<String> loadListFromConf = ConfUtils.loadListFromConf("es." + str + ".addresses", map);
        ArrayList arrayList = new ArrayList();
        for (String str2 : loadListFromConf) {
            String str3 = "http";
            if (!str2.startsWith(str3)) {
                str2 = "http://" + str2;
            }
            URI create = URI.create(str2);
            if (create.getHost() == null) {
                throw new RuntimeException("host undefined " + str2);
            }
            int port = create.getPort() != -1 ? create.getPort() : 9200;
            if (create.getScheme() != null) {
                str3 = create.getScheme();
            }
            arrayList.add(new HttpHost(create.getHost(), port, str3));
        }
        RestClientBuilder builder = RestClient.builder((HttpHost[]) arrayList.toArray(new HttpHost[0]));
        String string = ConfUtils.getString(map, "es." + str + ".user");
        String string2 = ConfUtils.getString(map, "es." + str + ".password");
        String string3 = ConfUtils.getString(map, "es." + str + ".proxy.host");
        int i = ConfUtils.getInt(map, "es." + str + ".proxy.port", -1);
        String string4 = ConfUtils.getString(map, "es." + str + ".proxy.scheme", "http");
        boolean z = StringUtils.isNotBlank(string) && StringUtils.isNotBlank(string2);
        boolean z2 = StringUtils.isNotBlank(string3) && i != -1;
        if (z || z2) {
            builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                if (z) {
                    BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
                    basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(string, string2));
                    httpAsyncClientBuilder.setDefaultCredentialsProvider(basicCredentialsProvider);
                }
                if (z2) {
                    httpAsyncClientBuilder.setProxy(new HttpHost(string3, i, string4));
                }
                return httpAsyncClientBuilder;
            });
        }
        int i2 = ConfUtils.getInt(map, "es." + str + ".connect.timeout", 1000);
        int i3 = ConfUtils.getInt(map, "es." + str + ".socket.timeout", 30000);
        builder.setRequestConfigCallback(builder2 -> {
            return builder2.setConnectTimeout(i2).setSocketTimeout(i3);
        });
        builder.setNodeSelector(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                Node node = (Node) it.next();
                LOG.debug("Connected to ES node {} [{}] for {}", new Object[]{node.getName(), node.getHost(), str});
            }
        });
        builder.setCompressionEnabled(ConfUtils.getBoolean(map, "es." + str + ".compression", false));
        return new RestHighLevelClientBuilder(builder.build()).setApiCompatibilityMode(Boolean.valueOf(ConfUtils.getBoolean(map, "es." + str + ".compatibility.mode", false))).build();
    }

    public static ElasticSearchConnection getConnection(Map<String, Object> map, String str) {
        return getConnection(map, str, new BulkProcessor.Listener() { // from class: com.digitalpebble.stormcrawler.elasticsearch.ElasticSearchConnection.1
            public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            }

            public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
            }

            public void beforeBulk(long j, BulkRequest bulkRequest) {
            }
        });
    }

    public static ElasticSearchConnection getConnection(Map<String, Object> map, String str, BulkProcessor.Listener listener) {
        TimeValue parseTimeValue = TimeValue.parseTimeValue(ConfUtils.getString(map, "es." + str + ".flushInterval", "5s"), TimeValue.timeValueSeconds(5L), "flushInterval");
        int i = ConfUtils.getInt(map, "es." + str + ".bulkActions", 50);
        int i2 = ConfUtils.getInt(map, "es." + str + ".concurrentRequests", 1);
        RestHighLevelClient client = getClient(map, str);
        Sniffer sniffer = null;
        if (ConfUtils.getBoolean(map, "es." + str + ".sniff", true)) {
            sniffer = Sniffer.builder(client.getLowLevelClient()).build();
        }
        return new ElasticSearchConnection(client, BulkProcessor.builder((bulkRequest, actionListener) -> {
            client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
        }, listener, str + "-bulk-processor").setFlushInterval(parseTimeValue).setBulkActions(i).setConcurrentRequests(i2).build(), sniffer);
    }

    public void close() {
        if (this.isClosed) {
            LOG.warn("Tried to close an already closed connection!");
            return;
        }
        LOG.debug("Start closing the ElasticSearchConnection");
        try {
            if (!this.processor.awaitClose(60L, TimeUnit.SECONDS)) {
                throw new RuntimeException("Failed to flush pending actions when closing BulkProcessor");
            }
            if (this.sniffer != null) {
                this.sniffer.close();
            }
            try {
                this.client.close();
            } catch (IOException e) {
                LOG.trace("Client threw IO exception.");
            }
            this.isClosed = true;
        } catch (InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }
}
