package io.debezium.server.nats.streaming;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.streaming.NatsStreaming;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Dependent
@Named("nats-streaming")
/* loaded from: input_file:io/debezium/server/nats/streaming/NatsStreamingChangeConsumer.class */
public class NatsStreamingChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(NatsStreamingChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.nats-streaming.";
    private static final String PROP_URL = "debezium.sink.nats-streaming.url";
    private static final String PROP_CLUSTER_ID = "debezium.sink.nats-streaming.cluster.id";
    private static final String PROP_CLIENT_ID = "debezium.sink.nats-streaming.client.id";
    private String url;
    private String clusterId;
    private String clientId;
    private Connection nc;
    private StreamingConnection sc;

    @Inject
    @CustomConsumerBuilder
    Instance<StreamingConnection> customStreamingConnection;

    @PostConstruct
    void connect() {
        if (this.customStreamingConnection.isResolvable()) {
            this.sc = (StreamingConnection) this.customStreamingConnection.get();
            LOGGER.info("Obtained custom configured StreamingConnection '{}'", this.sc);
            return;
        }
        Config config = ConfigProvider.getConfig();
        this.url = (String) config.getValue(PROP_URL, String.class);
        this.clusterId = (String) config.getValue(PROP_CLUSTER_ID, String.class);
        this.clientId = (String) config.getValue(PROP_CLIENT_ID, String.class);
        try {
            this.nc = Nats.connect(new Options.Builder().server(this.url).noReconnect().build());
            this.sc = NatsStreaming.connect(this.clusterId, this.clientId, new Options.Builder().natsConn(this.nc).build());
            LOGGER.info("Using default StreamingConnection '{}'", this.sc);
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }

    @PreDestroy
    void close() {
        try {
            if (this.sc != null) {
                this.sc.close();
                LOGGER.info("NATS Streaming connection closed.");
            }
            if (this.nc != null) {
                this.nc.close();
                LOGGER.info("NATS connection closed.");
            }
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        for (ChangeEvent<Object, Object> changeEvent : list) {
            if (changeEvent.value() != null) {
                String map = this.streamNameMapper.map(changeEvent.destination());
                byte[] bytes = getBytes(changeEvent.value());
                LOGGER.trace("Received event @ {} = '{}'", map, getString(changeEvent.value()));
                try {
                    this.sc.publish(map, bytes);
                } catch (Exception e) {
                    throw new DebeziumException(e);
                }
            }
            recordCommitter.markProcessed(changeEvent);
        }
        recordCommitter.markBatchFinished();
    }
}
