package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.TransportClient;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:WEB-INF/lib/azure-cosmos-4.32.1.jar:com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdOpenConnectionsHandler.class */
public class RntbdOpenConnectionsHandler implements IOpenConnectionsHandler {
    private static final Logger logger = LoggerFactory.getLogger(RntbdOpenConnectionsHandler.class);
    private static int DEFAULT_CONNECTION_SEMAPHORE_TIMEOUT_IN_MINUTES = 10;
    private final TransportClient transportClient;
    private final Semaphore openConnectionsSemaphore;

    public RntbdOpenConnectionsHandler(TransportClient transportClient) {
        Preconditions.checkNotNull(transportClient, "Argument 'transportClient' can not be null");
        this.transportClient = transportClient;
        this.openConnectionsSemaphore = new Semaphore(Configs.getCPUCnt());
    }

    @Override // com.azure.cosmos.implementation.IOpenConnectionsHandler
    public Flux<OpenConnectionResponse> openConnections(List<Uri> list) {
        Preconditions.checkNotNull(list, "Argument 'addresses' should not be null");
        if (logger.isDebugEnabled()) {
            logger.debug("Open connections for addresses {}", StringUtils.join(list, ","));
        }
        return Flux.fromIterable(list).flatMap(uri -> {
            try {
                if (this.openConnectionsSemaphore.tryAcquire(DEFAULT_CONNECTION_SEMAPHORE_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES)) {
                    return this.transportClient.openConnection(uri).onErrorResume(th -> {
                        return Mono.just(new OpenConnectionResponse(uri, false, th));
                    }).doOnNext(openConnectionResponse -> {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Connection result: isConnected [{}], address [{}]", Boolean.valueOf(openConnectionResponse.isConnected()), openConnectionResponse.getUri());
                        }
                    }).doOnTerminate(() -> {
                        this.openConnectionsSemaphore.release();
                    });
                }
            } catch (InterruptedException e) {
                logger.warn("Acquire connection semaphore failed", e);
            }
            return Mono.just(new OpenConnectionResponse(uri, false, new IllegalStateException("Unable to acquire semaphore")));
        });
    }
}
