package org.eclipse.hono.client.util;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.eclipse.hono.client.ServerErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/util/CachingClientFactory.class */
public final class CachingClientFactory<T> {
    private static final Logger log = LoggerFactory.getLogger(CachingClientFactory.class);
    private static final int WAITING_CREATION_REQUESTS_COMPLETION_BATCH_SIZE_DEFAULT = 10;
    private final Vertx vertx;
    private final Predicate<T> livenessCheck;
    private final Map<String, T> activeClients = new HashMap();
    private final Map<String, Deque<CachingClientFactory<T>.CreationRequest>> waitingCreationRequests = new HashMap();
    private int waitingCreationRequestsCompletionBatchSize = WAITING_CREATION_REQUESTS_COMPLETION_BATCH_SIZE_DEFAULT;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/hono/client/util/CachingClientFactory$CreationRequest.class */
    public class CreationRequest {
        final String key;
        final Supplier<Future<T>> clientInstanceSupplier;
        final Handler<AsyncResult<T>> result;

        CreationRequest(String str, Supplier<Future<T>> supplier, Handler<AsyncResult<T>> handler) {
            this.key = str;
            this.clientInstanceSupplier = supplier;
            this.result = handler;
        }

        void complete(T t) {
            this.result.handle(Future.succeededFuture(t));
        }

        void fail(Throwable th) {
            CachingClientFactory.log.debug("failed to create new client for [key: {}]: {}", this.key, th.getMessage());
            this.result.handle(Future.failedFuture(th));
        }
    }

    public CachingClientFactory(Vertx vertx, Predicate<T> predicate) {
        this.vertx = vertx;
        this.livenessCheck = (Predicate) Objects.requireNonNull(predicate);
    }

    public void onDisconnect() {
        ServerErrorException serverErrorException = new ServerErrorException(503, "no connection to service");
        this.activeClients.clear();
        this.waitingCreationRequests.keySet().forEach(str -> {
            failCreationRequests(str, serverErrorException);
        });
    }

    void setWaitingCreationRequestsCompletionBatchSize(int i) {
        this.waitingCreationRequestsCompletionBatchSize = i;
    }

    public T removeClient(String str) {
        return this.activeClients.remove(str);
    }

    public T removeClient(String str, Handler<T> handler) {
        T removeClient = removeClient(str);
        if (removeClient != null) {
            handler.handle(removeClient);
        }
        return removeClient;
    }

    public T getClient(String str) {
        return this.activeClients.get(str);
    }

    public void getOrCreateClient(String str, Supplier<Future<T>> supplier, Handler<AsyncResult<T>> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(handler);
        getOrCreateClient(new CreationRequest(str, supplier, handler));
    }

    private void getOrCreateClient(CachingClientFactory<T>.CreationRequest creationRequest) {
        Deque<CachingClientFactory<T>.CreationRequest> computeIfAbsent = this.waitingCreationRequests.computeIfAbsent(creationRequest.key, str -> {
            return new ArrayDeque();
        });
        if (!computeIfAbsent.isEmpty()) {
            log.debug("delaying client creation request, previous requests still being finished for [{}] ({} waiting creation requests for all keys)", creationRequest.key, Integer.valueOf(this.waitingCreationRequests.size()));
            computeIfAbsent.add(creationRequest);
            return;
        }
        T t = this.activeClients.get(creationRequest.key);
        if (t != null && this.livenessCheck.test(t)) {
            log.debug("reusing cached client [key: {}]", creationRequest.key);
            creationRequest.complete(t);
            return;
        }
        computeIfAbsent.add(creationRequest);
        log.debug("creating new client for [key: {}]", creationRequest.key);
        try {
            Future<T> future = creationRequest.clientInstanceSupplier.get();
            if (future == null) {
                throw new NullPointerException("clientInstanceSupplier result is null");
            }
            future.onComplete(asyncResult -> {
                if (!future.succeeded()) {
                    failCreationRequests(creationRequest.key, future.cause());
                    return;
                }
                log.debug("successfully created new client for [key: {}]", creationRequest.key);
                completeCreationRequests(creationRequest.key, future.result());
            });
        } catch (Exception e) {
            log.error("exception creating new client for [key: {}]", creationRequest.key, e);
            this.activeClients.remove(creationRequest.key);
            failCreationRequests(creationRequest.key, new ServerErrorException(500, String.format("exception creating new client for [key: %s]: %s", creationRequest.key, e.getMessage())));
        }
    }

    private void failCreationRequests(String str, Throwable th) {
        this.activeClients.remove(str);
        Deque<CachingClientFactory<T>.CreationRequest> computeIfAbsent = this.waitingCreationRequests.computeIfAbsent(str, str2 -> {
            return new ArrayDeque();
        });
        int size = computeIfAbsent.size();
        while (!computeIfAbsent.isEmpty()) {
            computeIfAbsent.removeFirst().fail(th);
        }
        if (size <= 0 || !log.isDebugEnabled()) {
            return;
        }
        log.debug("failed {} concurrent requests to create new client for [key: {}]: {}", new Object[]{Integer.valueOf(size), str, th.getMessage()});
    }

    private void completeCreationRequests(String str, T t) {
        CachingClientFactory<T>.CreationRequest pollFirst;
        this.activeClients.put(str, t);
        Deque<CachingClientFactory<T>.CreationRequest> computeIfAbsent = this.waitingCreationRequests.computeIfAbsent(str, str2 -> {
            return new ArrayDeque();
        });
        for (int i = 0; i <= this.waitingCreationRequestsCompletionBatchSize && (pollFirst = computeIfAbsent.pollFirst()) != null; i++) {
            pollFirst.complete(t);
        }
        if (computeIfAbsent.isEmpty()) {
            return;
        }
        log.trace("decoupling completion of remaining waiting creation requests");
        this.vertx.runOnContext(r7 -> {
            completeCreationRequests(str, t);
        });
    }
}
