package com.mulesoft.connectors.azure.eventhubs.internal.client;

import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.mulesoft.connectors.azure.eventhubs.internal.source.eventhandler.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/azure/eventhubs/internal/client/DefaultPartitionConsumerClient.class */
public class DefaultPartitionConsumerClient implements PartitionConsumerClient {
    private static final Logger logger = LoggerFactory.getLogger(DefaultPartitionConsumerClient.class);
    private final EventHubConsumerAsyncClient client;

    public DefaultPartitionConsumerClient(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) {
        this.client = eventHubConsumerAsyncClient;
    }

    @Override // com.mulesoft.connectors.azure.eventhubs.internal.client.PartitionConsumerClient
    public void consume(EventHandler eventHandler, String str, EventPosition eventPosition) {
        this.client.receiveFromPartition(str, eventPosition).subscribe(partitionEvent -> {
            eventHandler.handle(partitionEvent.getData());
        }, th -> {
            logger.error("Error occurred while consuming events: {}", th.getMessage());
        }, () -> {
            logger.debug("Finished reading events.");
        });
    }

    @Override // com.mulesoft.connectors.azure.eventhubs.internal.client.PartitionConsumerClient
    public void close() {
        this.client.close();
    }
}
