package com.mulesoft.connectors.kafka.internal.source;

import com.mulesoft.connectors.kafka.api.source.AckMode;
import com.mulesoft.connectors.kafka.internal.config.ConsumerConfiguration;
import com.mulesoft.connectors.kafka.internal.connection.ConsumerConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/kafka/internal/source/MessageListenerSource.class */
public abstract class MessageListenerSource<P, A> extends Source<P, A> {
    private static final Logger logger = LoggerFactory.getLogger(MessageListenerSource.class);

    @Config
    protected ConsumerConfiguration config;

    @Connection
    private ConnectionProvider<ConsumerConnection> connectionProvider;

    @Optional
    @Summary("The timeout for the poll")
    @Parameter
    @ConfigOverride
    @DisplayName("Poll timeout")
    private int pollTimeout;

    @Optional
    @Summary("Time unit for poll timeout.")
    @Parameter
    @ConfigOverride
    @DisplayName("Poll timeout time unit")
    private TimeUnit pollTimeoutTimeUnit;

    @Optional
    @Summary("Declares the kind of Acknowledgement mode supported.")
    @Parameter
    @ConfigOverride
    @DisplayName("Acknowledgement mode")
    private AckMode ackMode;

    @DisplayName("Amount of parallel consumers")
    @Optional(defaultValue = "1")
    @Parameter
    private int parallelConsumersAmount;
    private List<PollingTask<P, A, ?>> pollingTasks;
    private ConsumerConnection connection;

    public void onStart(SourceCallback<P, A> sourceCallback) throws MuleException {
        SourceCallbackWrapper sourceCallbackWrapper = new SourceCallbackWrapper(sourceCallback);
        logger.info("Starting {} polling task{} in {} mode. Polling timeout is {} ms.", new Object[]{Integer.valueOf(this.parallelConsumersAmount), this.parallelConsumersAmount == 1 ? "" : "s", this.ackMode, Long.valueOf(this.pollTimeoutTimeUnit.toMillis(this.pollTimeout))});
        this.connection = (ConsumerConnection) this.connectionProvider.connect();
        if (logger.isDebugEnabled()) {
            logger.debug("Message listener is using connection provider {}.", this.connectionProvider);
            logger.debug("Message listener is using connection {}.", this.connection);
        }
        this.pollingTasks = new ArrayList();
        for (int i = 0; i < this.parallelConsumersAmount; i++) {
            logger.debug("Starting polling task {}.", Integer.valueOf(i + 1));
            List<PollingTask<P, A, ?>> list = this.pollingTasks;
            list.getClass();
            createPollingTask((v1) -> {
                r1.add(v1);
            }, this.config, this.connection, sourceCallbackWrapper, this.ackMode, this.config.asDuration(this.pollTimeout, this.pollTimeoutTimeUnit));
            this.connection.startPolling(this.pollingTasks.get(i));
        }
    }

    public abstract void createPollingTask(Consumer<PollingTask<P, A, ?>> consumer, ConsumerConfiguration consumerConfiguration, ConsumerConnection consumerConnection, SourceCallback<P, A> sourceCallback, AckMode ackMode, Duration duration);

    @OnSuccess
    public void onSuccess(SourceCallbackContext sourceCallbackContext) {
        if (this.ackMode.equals(AckMode.AUTO)) {
            try {
                this.connection.commit(this.ackMode, (String) sourceCallbackContext.getVariable(PollingTask.SESSION_KEY).orElse(""));
            } catch (Exception e) {
                logger.error("Failed to commit offsets in source", e);
                sourceCallbackContext.getSourceCallback().onConnectionException(new ConnectionException(e, sourceCallbackContext.getConnection()));
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Finished the flow successfully for an event.");
        }
    }

    @OnError
    public void onFailure(SourceCallbackContext sourceCallbackContext) {
        if (this.ackMode.equals(AckMode.MANUAL)) {
            logger.warn("The flow failed, the listener will consume the same messages if the commit was not invoked");
            this.connection.refreshBuffer(AckMode.MANUAL, (String) sourceCallbackContext.getVariable(PollingTask.SESSION_KEY).orElse(""));
        } else if (this.ackMode.equals(AckMode.AUTO)) {
            logger.warn("The flow failed, the listener will consume the same messages");
            this.connection.refreshBuffer(AckMode.AUTO, (String) sourceCallbackContext.getVariable(PollingTask.SESSION_KEY).orElse(""));
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Finished the flow with an error for an event.");
        }
    }

    @OnTerminate
    public void onTerminate(SourceCallbackContext sourceCallbackContext) {
        if (this.ackMode.equals(AckMode.MANUAL)) {
            logger.debug("The flow terminated, releasing consumer for MANUAL ack mode.");
            this.connection.release(this.ackMode, (String) sourceCallbackContext.getVariable(PollingTask.SESSION_KEY).orElse(""));
        } else if (this.ackMode.equals(AckMode.AUTO)) {
            logger.debug("The flow terminated, releasing consumer for AUTO ack mode.");
            this.connection.release(this.ackMode, (String) sourceCallbackContext.getVariable(PollingTask.SESSION_KEY).orElse(""));
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Terminated flow with context {}", sourceCallbackContext);
        }
    }

    public void onStop() {
        if (logger.isDebugEnabled()) {
            logger.debug("Stopping MessageListener {}", getClass().getSimpleName());
        }
        if (this.pollingTasks != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Stopping all polling tasks");
            }
            this.pollingTasks.forEach((v0) -> {
                IOUtils.closeQuietly(v0);
            });
        }
        logger.debug("Stopped MessageListener {}", getClass().getSimpleName());
    }
}
