package com.mulesoft.connectors.azure.eventhubs.internal.source;

import com.mulesoft.connectors.azure.eventhubs.api.EventAttributes;
import com.mulesoft.connectors.azure.eventhubs.internal.client.EventHubConsumerClient;
import com.mulesoft.connectors.azure.eventhubs.internal.connection.AzureEventHubsConnection;
import com.mulesoft.connectors.azure.eventhubs.internal.source.eventhandler.DefaultEventHandler;
import java.io.InputStream;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.BackPressure;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.source.BackPressureMode;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;

@BackPressure(defaultMode = BackPressureMode.DROP, supportedModes = {BackPressureMode.DROP, BackPressureMode.WAIT})
@ClusterSupport(SourceClusterSupport.DEFAULT_ALL_NODES)
@MediaType(value = "*/*", strict = false)
@Alias("eventhub-listener")
/* loaded from: input_file:com/mulesoft/connectors/azure/eventhubs/internal/source/EventHubListener.class */
public class EventHubListener extends Source<InputStream, EventAttributes> {
    public static final int DEFAULT_CHECKPOINT_FREQUENCY = 1000;

    @Optional
    @Parameter
    @Summary("The Consumer Group you want to belong to.")
    private String consumerGroup;

    @Optional
    @Parameter
    @Summary("The frequency of updating the checkpoint. For instance, every 1000 events received.")
    private Integer checkpointFrequency;

    @Connection
    private ConnectionProvider<AzureEventHubsConnection> connectionProvider;
    private EventHubConsumerClient consumer;
    private AzureEventHubsConnection eventHubsConnection;

    public void onStart(SourceCallback<InputStream, EventAttributes> sourceCallback) throws ConnectionException {
        this.eventHubsConnection = (AzureEventHubsConnection) this.connectionProvider.connect();
        this.consumer = this.eventHubsConnection.getEventHubConsumer(this.consumerGroup, getCheckpointFrequency().intValue(), new DefaultEventHandler(sourceCallback));
        this.consumer.consume();
    }

    public void onStop() {
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.eventHubsConnection != null) {
            this.connectionProvider.disconnect(this.eventHubsConnection);
        }
    }

    public Integer getCheckpointFrequency() {
        return Integer.valueOf(this.checkpointFrequency == null ? DEFAULT_CHECKPOINT_FREQUENCY : this.checkpointFrequency.intValue());
    }
}
