package io.gravitee.reporter.elastic.indexer;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.gravitee.reporter.elastic.config.ElasticConfiguration;
import io.gravitee.reporter.elastic.config.PipelineConfiguration;
import io.gravitee.reporter.elastic.model.elasticsearch.Health;
import io.gravitee.reporter.elastic.model.exception.TechnicalException;
import io.gravitee.reporter.elastic.templating.freemarker.FreeMarkerComponent;
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.processors.PublishProcessor;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.core.http.HttpClientRequest;
import io.vertx.reactivex.core.http.HttpClientResponse;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:io/gravitee/reporter/elastic/indexer/ElasticsearchBulkIndexer.class */
public class ElasticsearchBulkIndexer {
    private static final String URL_STATE_CLUSTER = "/_cluster/health";
    private static final String URL_TEMPLATE = "/_template";
    private static final String URL_BULK = "/_bulk";
    private static final String URL_INGEST = "/_ingest/pipeline";
    private static final String CONTENT_TYPE = "application/json;charset=UTF-8";
    private static final String HTTPS_SCHEME = "https";

    @Autowired
    private ElasticConfiguration configuration;

    @Autowired
    private PipelineConfiguration pipelineConfiguration;

    @Autowired
    private FreeMarkerComponent freeMarkerComponent;

    @Autowired
    private Vertx vertx;
    private HttpClient httpClient;
    private ObjectMapper mapper;
    private String authorizationHeader;
    private int majorVersion;
    private final Logger logger = LoggerFactory.getLogger(ElasticsearchBulkIndexer.class);
    private final PublishProcessor<String> bulkProcessor = PublishProcessor.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/gravitee/reporter/elastic/indexer/ElasticsearchBulkIndexer$VertxHttpResponse.class */
    public class VertxHttpResponse {
        final HttpClientResponse response;
        final Buffer body;

        VertxHttpResponse(HttpClientResponse httpClientResponse, Buffer buffer) {
            this.response = httpClientResponse;
            this.body = buffer;
        }
    }

    public void start() throws ExecutionException, InterruptedException, IOException, TechnicalException {
        if (this.configuration.getEndpoints().isEmpty()) {
            return;
        }
        URI create = URI.create(this.configuration.getEndpoints().get(0).getUrl());
        HttpClientOptions defaultPort = new HttpClientOptions().setDefaultHost(create.getHost()).setDefaultPort(create.getPort() != -1 ? create.getPort() : HTTPS_SCHEME.equals(create.getScheme()) ? 443 : 80);
        if (HTTPS_SCHEME.equals(create.getScheme())) {
            defaultPort.setSsl(true).setTrustAll(true);
        }
        this.httpClient = this.vertx.createHttpClient(defaultPort);
        this.mapper = new ObjectMapper();
        if (this.configuration.getUsername() != null) {
            this.authorizationHeader = initEncodedAuthorization(this.configuration.getUsername(), this.configuration.getPassword());
        }
        try {
            this.majorVersion = getMajorVersion();
            ensureTemplate();
            ensureIngestPlugins();
            this.bulkProcessor.buffer(this.configuration.getFlushInterval().longValue(), TimeUnit.SECONDS, this.configuration.getBulkActions().intValue()).subscribe(this::index);
        } catch (Exception e) {
            throw new TechnicalException("An error occurs while getting information from Elasticsearch at " + create.toString(), e);
        }
    }

    private String initEncodedAuthorization(String str, String str2) {
        return "Basic " + Base64.getEncoder().encodeToString((str + ":" + str2).getBytes(StandardCharsets.UTF_8));
    }

    private int getMajorVersion() throws ExecutionException, InterruptedException, IOException, TechnicalException {
        VertxHttpResponse vertxHttpResponse = (VertxHttpResponse) doRequest(this.httpClient.get("/"), null).blockingGet();
        if (vertxHttpResponse.response.statusCode() != 200) {
            this.logger.error("Impossible to call Elasticsearch GET {}.", "/");
            throw new TechnicalException("Impossible to call Elasticsearch. Elasticsearch response code is " + vertxHttpResponse.response.statusCode());
        }
        String asText = this.mapper.readTree(vertxHttpResponse.body.toString()).path("version").path("number").asText();
        float floatValue = Float.valueOf(asText.substring(0, 3)).floatValue();
        int intValue = Integer.valueOf(asText.substring(0, 1)).intValue();
        if (floatValue < 2.0f) {
            this.logger.warn("Please upgrade to Elasticsearch 2 or later. version={}", asText);
        }
        return intValue;
    }

    Health getClusterHealth() throws TechnicalException {
        try {
            VertxHttpResponse vertxHttpResponse = (VertxHttpResponse) doRequest(this.httpClient.get(URL_STATE_CLUSTER), null).blockingGet();
            if (vertxHttpResponse.response.statusCode() != 200) {
                this.logger.error("Impossible to call Elasticsearch GET {}.", URL_STATE_CLUSTER);
                throw new TechnicalException("Impossible to call Elasticsearch. Elasticsearch response code is " + vertxHttpResponse.response.statusCode());
            }
            String buffer = vertxHttpResponse.body.toString();
            this.logger.debug("Response of ES for GET {} : {}", URL_STATE_CLUSTER, buffer);
            return (Health) this.mapper.readValue(buffer, Health.class);
        } catch (IOException e) {
            this.logger.error("Impossible to call Elasticsearch GET {}.", URL_STATE_CLUSTER, e);
            throw new TechnicalException("Impossible to call Elasticsearch. Error is " + e.getClass().getSimpleName(), e);
        }
    }

    private void ensureTemplate() throws TechnicalException {
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("indexName", this.configuration.getIndexName());
            hashMap.put("numberOfShards", Integer.valueOf(this.configuration.getNumberOfShards()));
            hashMap.put("numberOfReplicas", Integer.valueOf(this.configuration.getNumberOfReplicas()));
            String generateFromTemplate = this.freeMarkerComponent.generateFromTemplate("index-template-es-" + this.majorVersion + "x.ftl", hashMap);
            this.logger.debug("PUT template : {}", generateFromTemplate);
            VertxHttpResponse vertxHttpResponse = (VertxHttpResponse) doRequest(this.httpClient.put("/_template/gravitee"), generateFromTemplate).blockingGet();
            String buffer = vertxHttpResponse.body.toString();
            if (vertxHttpResponse.response.statusCode() != 200) {
                this.logger.error("Impossible to call Elasticsearch PUT {}. Body is {}", "/_template/gravitee", buffer);
                throw new TechnicalException("Impossible to call Elasticsearch PUT /_template/gravitee. Response is " + vertxHttpResponse.response.statusCode());
            }
            this.logger.debug("Response of ES for PUT {} : {}", "/_template/gravitee", buffer);
        } catch (Exception e) {
            this.logger.error("Impossible to call Elasticsearch", e);
            throw new TechnicalException("Impossible to call Elasticsearch.", e);
        }
    }

    private void ensureIngestPlugins() throws TechnicalException {
        try {
            String createPipeline = this.pipelineConfiguration.createPipeline(this.majorVersion);
            if (createPipeline != null && this.pipelineConfiguration.getPipelineName() != null) {
                this.logger.debug("PUT ingest pipeline template : {}", createPipeline);
                VertxHttpResponse vertxHttpResponse = (VertxHttpResponse) doRequest(this.httpClient.put("/_ingest/pipeline/" + this.pipelineConfiguration.getPipelineName()), createPipeline).blockingGet();
                String buffer = vertxHttpResponse.body.toString();
                if (vertxHttpResponse.response.statusCode() != 200) {
                    this.logger.debug("Impossible to call Elasticsearch PUT {}. {}.", "/_ingest/pipeline/" + this.pipelineConfiguration.getPipelineName(), buffer);
                    this.logger.warn("Impossible to create a pipeline for " + this.pipelineConfiguration.getIngestManaged());
                } else {
                    this.logger.info("Manage Ingest pipeline {}", this.pipelineConfiguration.getIngestPlugins().toString());
                    this.pipelineConfiguration.valid();
                }
                this.logger.debug("Response of ES for PUT {} : {}", "/_ingest/pipeline/" + this.pipelineConfiguration.getPipelineName(), buffer);
            }
        } catch (Exception e) {
            this.logger.error("Impossible to call ingest pipeline " + this.pipelineConfiguration.getPipelineName() + " with ingest plugins " + this.pipelineConfiguration.getIngestPlugins(), e);
        }
    }

    public void index(String str) {
        this.bulkProcessor.onNext(str);
    }

    private void index(List<String> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        try {
            String str = (String) list.stream().collect(Collectors.joining());
            this.logger.debug("Try to call POST {}, with body {}", URL_BULK, str);
            HttpClientRequest post = this.httpClient.post(URL_BULK);
            post.putHeader("Content-Type", "application/x-ndjson");
            doRequest(post, str).subscribe(new SingleObserver<VertxHttpResponse>() { // from class: io.gravitee.reporter.elastic.indexer.ElasticsearchBulkIndexer.1
                public void onSubscribe(Disposable disposable) {
                }

                public void onSuccess(VertxHttpResponse vertxHttpResponse) {
                    String buffer = vertxHttpResponse.body.toString();
                    ElasticsearchBulkIndexer.this.logger.debug("Response of ES for POST {} : {}", ElasticsearchBulkIndexer.URL_BULK, buffer);
                    if (vertxHttpResponse.response.statusCode() != 200) {
                        ElasticsearchBulkIndexer.this.logger.error("Impossible to call Elasticsearch POST {}. Body is {}", ElasticsearchBulkIndexer.URL_BULK, buffer);
                    }
                }

                public void onError(Throwable th) {
                    ElasticsearchBulkIndexer.this.logger.error("An error occurs while calling Elasticsearch POST {}", ElasticsearchBulkIndexer.URL_BULK, th);
                }
            });
        } catch (Exception e) {
            this.logger.error("Unexpected error while bulk indexing data to Elasticsearch", e);
        }
    }

    private Single<VertxHttpResponse> doRequest(HttpClientRequest httpClientRequest, String str) {
        addCommonHeaders(httpClientRequest);
        return Single.create(singleEmitter -> {
            singleEmitter.getClass();
            httpClientRequest.exceptionHandler(singleEmitter::onError).toFlowable().doOnSubscribe(subscription -> {
                if (str == null) {
                    httpClientRequest.end();
                } else {
                    httpClientRequest.end(str);
                }
            }).flatMapSingle(new Function<HttpClientResponse, Single<VertxHttpResponse>>() { // from class: io.gravitee.reporter.elastic.indexer.ElasticsearchBulkIndexer.2
                public Single<VertxHttpResponse> apply(HttpClientResponse httpClientResponse) throws Exception {
                    SingleEmitter singleEmitter = singleEmitter;
                    return Single.create(singleEmitter2 -> {
                        singleEmitter.getClass();
                        httpClientResponse.exceptionHandler(singleEmitter::onError);
                        httpClientResponse.bodyHandler(buffer -> {
                            singleEmitter.onSuccess(new VertxHttpResponse(httpClientResponse, buffer));
                        });
                    });
                }
            }).subscribe();
        });
    }

    private void addCommonHeaders(HttpClientRequest httpClientRequest) {
        httpClientRequest.putHeader("Accept", CONTENT_TYPE).putHeader("Accept-Charset", StandardCharsets.UTF_8.name());
        if (this.authorizationHeader != null) {
            httpClientRequest.putHeader("Authorization", this.authorizationHeader);
        }
    }
}
