package org.eclipse.hono.client.util;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.eclipse.hono.client.ServerErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/util/CachingClientFactory.class */
public final class CachingClientFactory<T> extends ClientFactory<T> {
    private static final Logger log = LoggerFactory.getLogger(CachingClientFactory.class);
    private static final int WAITING_CREATION_REQUESTS_COMPLETION_BATCH_SIZE_DEFAULT = 10;
    private final Vertx vertx;
    private final Predicate<T> livenessCheck;
    private final Map<String, T> activeClients = new HashMap();
    private final Map<String, Boolean> creationLocks = new HashMap();
    private final List<CachingClientFactory<T>.CreationRequestData> waitingCreationRequests = new ArrayList();
    private final Set<String> keysOfBeingCompletedConcurrentCreationRequests = new HashSet();
    private int waitingCreationRequestsCompletionBatchSize = WAITING_CREATION_REQUESTS_COMPLETION_BATCH_SIZE_DEFAULT;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/hono/client/util/CachingClientFactory$CreationRequestData.class */
    public class CreationRequestData {
        final String key;
        final Supplier<Future<T>> clientInstanceSupplier;
        final Handler<AsyncResult<T>> result;

        CreationRequestData(String str, Supplier<Future<T>> supplier, Handler<AsyncResult<T>> handler) {
            this.key = str;
            this.clientInstanceSupplier = supplier;
            this.result = handler;
        }
    }

    public CachingClientFactory(Vertx vertx, Predicate<T> predicate) {
        this.vertx = vertx;
        this.livenessCheck = (Predicate) Objects.requireNonNull(predicate);
    }

    void setWaitingCreationRequestsCompletionBatchSize(int i) {
        this.waitingCreationRequestsCompletionBatchSize = i;
    }

    public void removeClient(String str) {
        this.activeClients.remove(str);
    }

    public void removeClient(String str, Handler<T> handler) {
        T remove = this.activeClients.remove(str);
        if (remove != null) {
            handler.handle(remove);
        }
    }

    @Override // org.eclipse.hono.client.util.ClientFactory
    protected void doClearStateAfterCreationRequestsCleared() {
        this.activeClients.clear();
        this.creationLocks.clear();
    }

    public T getClient(String str) {
        return this.activeClients.get(str);
    }

    public void getOrCreateClient(String str, Supplier<Future<T>> supplier, Handler<AsyncResult<T>> handler) {
        if (this.keysOfBeingCompletedConcurrentCreationRequests.contains(str)) {
            log.debug("delaying client creation request, previous requests still being finished for [{}] ({} waiting creation requests for all keys)", str, Integer.valueOf(this.waitingCreationRequests.size()));
            this.waitingCreationRequests.add(new CreationRequestData(str, supplier, handler));
            return;
        }
        T t = this.activeClients.get(str);
        if (t != null && this.livenessCheck.test(t)) {
            log.debug("reusing cached client [{}]", str);
            handler.handle(Future.succeededFuture(t));
            return;
        }
        if (this.creationLocks.putIfAbsent(str, Boolean.TRUE) != null) {
            this.waitingCreationRequests.add(new CreationRequestData(str, supplier, handler));
            log.debug("already trying to create a client for [{}] ({} waiting creation requests for all keys)", str, Integer.valueOf(this.waitingCreationRequests.size()));
            return;
        }
        Handler<ServerErrorException> handler2 = serverErrorException -> {
            if (!this.creationLocks.remove(str, Boolean.TRUE)) {
                log.debug("creation attempt already finished for [{}]", str);
                return;
            }
            log.debug("failed to create new client for [{}]: {}", str, serverErrorException.toString());
            handler.handle(Future.failedFuture(serverErrorException));
            failWaitingCreationRequests(str, serverErrorException);
        };
        this.creationRequests.add(handler2);
        log.debug("creating new client for [{}]", str);
        Future<T> future = null;
        try {
            future = supplier.get();
        } catch (Exception e) {
            this.creationLocks.remove(str);
            this.creationRequests.remove(handler2);
            log.error("exception creating new client for [{}]", str, e);
            this.activeClients.remove(str);
            ServerErrorException serverErrorException2 = new ServerErrorException(500, String.format("exception creating new client for [%s]: %s", str, e.getMessage()));
            handler.handle(Future.failedFuture(serverErrorException2));
            failWaitingCreationRequests(str, serverErrorException2);
        }
        if (future == null) {
            throw new NullPointerException("clientInstanceSupplier result is null");
        }
        if (future != null) {
            future.onComplete(asyncResult -> {
                this.creationRequests.remove(handler2);
                if (!this.creationLocks.remove(str, Boolean.TRUE)) {
                    log.debug("creation attempt already finished for [{}]", str);
                    return;
                }
                if (!asyncResult.succeeded()) {
                    log.debug("failed to create new client for [{}]", str, asyncResult.cause());
                    this.activeClients.remove(str);
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                    failWaitingCreationRequests(str, asyncResult.cause());
                    return;
                }
                Object result = asyncResult.result();
                log.debug("successfully created new client for [{}]", str);
                this.activeClients.put(str, result);
                handler.handle(Future.succeededFuture(result));
                processWaitingCreationRequests();
            });
        }
    }

    private void failWaitingCreationRequests(String str, Throwable th) {
        int i = 0;
        Iterator<CachingClientFactory<T>.CreationRequestData> it = this.waitingCreationRequests.iterator();
        while (it.hasNext()) {
            CachingClientFactory<T>.CreationRequestData next = it.next();
            if (str.equals(next.key)) {
                it.remove();
                i++;
                next.result.handle(Future.failedFuture(th));
            }
        }
        this.keysOfBeingCompletedConcurrentCreationRequests.remove(str);
        if (i <= 0 || !log.isDebugEnabled()) {
            return;
        }
        log.debug("failed {} concurrent requests to create new client for [{}]: {}", new Object[]{Integer.valueOf(i), str, th.toString()});
    }

    private void processWaitingCreationRequests() {
        int i = 0;
        this.keysOfBeingCompletedConcurrentCreationRequests.clear();
        Iterator<CachingClientFactory<T>.CreationRequestData> it = this.waitingCreationRequests.iterator();
        while (it.hasNext()) {
            CachingClientFactory<T>.CreationRequestData next = it.next();
            if (!this.creationLocks.containsKey(next.key)) {
                if (i < this.waitingCreationRequestsCompletionBatchSize) {
                    it.remove();
                    i++;
                    getOrCreateClient(next.key, next.clientInstanceSupplier, next.result);
                } else {
                    this.keysOfBeingCompletedConcurrentCreationRequests.add(next.key);
                }
            }
        }
        if (!this.keysOfBeingCompletedConcurrentCreationRequests.isEmpty()) {
            log.trace("decoupling completion of remaining waiting creation requests");
            this.vertx.runOnContext(r3 -> {
                processWaitingCreationRequests();
            });
        } else {
            if (this.waitingCreationRequests.isEmpty()) {
                return;
            }
            log.trace("no more waiting creation requests to complete at this time ({} remaining requests overall)", Integer.valueOf(this.waitingCreationRequests.size()));
        }
    }
}
