package org.eclipse.hono.client.registry.amqp;

import com.github.benmanes.caffeine.cache.Cache;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.StringTag;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import javax.security.auth.x500.X500Principal;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient;
import org.eclipse.hono.client.amqp.RequestResponseClient;
import org.eclipse.hono.client.impl.CachingClientFactory;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.util.AnnotatedCacheKey;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.TenantConstants;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TenantResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/registry/amqp/ProtonBasedTenantClient.class */
public final class ProtonBasedTenantClient extends AbstractRequestResponseServiceClient<TenantObject, TenantResult<TenantObject>> implements TenantClient {
    private static final Logger LOG = LoggerFactory.getLogger(ProtonBasedTenantClient.class);
    private static final StringTag TAG_SUBJECT_DN = new StringTag("subject_dn");
    private static final String ATTRIBUTE_KEY_TENANT_ID = "tenant-id";
    private final Map<Object, Future<TenantResult<TenantObject>>> pendingRequests;

    public ProtonBasedTenantClient(HonoConnection honoConnection, SendMessageSampler.Factory factory, Cache<Object, TenantResult<TenantObject>> cache) {
        super(honoConnection, factory, new CachingClientFactory(honoConnection.getVertx(), (v0) -> {
            return v0.isOpen();
        }), cache);
        this.pendingRequests = new HashMap();
        if (isCachingEnabled()) {
            NotificationEventBusSupport.registerConsumer(honoConnection.getVertx(), TenantChangeNotification.TYPE, tenantChangeNotification -> {
                if (LifecycleChange.DELETE.equals(tenantChangeNotification.getChange()) || (LifecycleChange.UPDATE.equals(tenantChangeNotification.getChange()) && !tenantChangeNotification.isEnabled())) {
                    removeResultFromCache(tenantChangeNotification.getTenantId());
                }
            });
        }
    }

    protected String getKey(String str) {
        return "tenant";
    }

    private Future<RequestResponseClient<TenantResult<TenantObject>>> getOrCreateClient() {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r4 -> {
            return this.connection.executeOnContext(promise -> {
                this.clientFactory.getOrCreateClient("tenant", () -> {
                    return RequestResponseClient.forEndpoint(this.connection, "tenant", (String) null, this.samplerFactory.create("tenant"), this::removeClient, this::removeClient);
                }, promise);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getResult, reason: merged with bridge method [inline-methods] */
    public TenantResult<TenantObject> m5getResult(int i, String str, Buffer buffer, CacheDirective cacheDirective, ApplicationProperties applicationProperties) {
        if (!isSuccessResponse(i, str, buffer)) {
            return TenantResult.from(i, (Object) null, (CacheDirective) null, applicationProperties);
        }
        try {
            return TenantResult.from(i, (TenantObject) Json.decodeValue(buffer, TenantObject.class), cacheDirective, applicationProperties);
        } catch (DecodeException e) {
            LOG.warn("received malformed payload from Tenant service", e);
            return TenantResult.from(500, (Object) null, (CacheDirective) null, applicationProperties);
        }
    }

    public Future<TenantObject> get(String str, SpanContext spanContext) {
        Objects.requireNonNull(str);
        AnnotatedCacheKey<?> annotatedCacheKey = new AnnotatedCacheKey<>(str);
        Span newChildSpan = newChildSpan(spanContext, "get Tenant by ID");
        newChildSpan.setTag("tenant_id", str);
        return get(annotatedCacheKey, () -> {
            return new JsonObject().put(ATTRIBUTE_KEY_TENANT_ID, str);
        }, newChildSpan);
    }

    public Future<TenantObject> get(X500Principal x500Principal, SpanContext spanContext) {
        Objects.requireNonNull(x500Principal);
        String name = x500Principal.getName("RFC2253");
        AnnotatedCacheKey<?> annotatedCacheKey = new AnnotatedCacheKey<>(x500Principal);
        Span newChildSpan = newChildSpan(spanContext, "get Tenant by subject DN");
        TAG_SUBJECT_DN.set(newChildSpan, name);
        return get(annotatedCacheKey, () -> {
            return new JsonObject().put("subject-dn", name);
        }, newChildSpan);
    }

    private Future<TenantObject> get(AnnotatedCacheKey<?> annotatedCacheKey, Supplier<JsonObject> supplier, Span span) {
        return mapResultAndFinishSpan(getResponseFromCache(annotatedCacheKey, span).recover(th -> {
            return executeOrUsePendingRequestResult(annotatedCacheKey, () -> {
                return getOrCreateClient().compose(requestResponseClient -> {
                    return requestResponseClient.createAndSendRequest(TenantConstants.TenantAction.get.toString(), (Map) null, ((JsonObject) supplier.get()).toBuffer(), "application/json", this::getRequestResponseResult, span);
                });
            });
        }), tenantResult -> {
            switch (tenantResult.getStatus()) {
                case 200:
                    return (TenantObject) tenantResult.getPayload();
                case 404:
                    throw new ClientErrorException(tenantResult.getStatus(), "no such tenant");
                default:
                    throw StatusCodeMapper.from(tenantResult);
            }
        }, span);
    }

    private Future<TenantResult<TenantObject>> executeOrUsePendingRequestResult(AnnotatedCacheKey<?> annotatedCacheKey, Supplier<Future<TenantResult<TenantObject>>> supplier) {
        Promise promise = Promise.promise();
        Optional.ofNullable(this.pendingRequests.putIfAbsent(annotatedCacheKey, promise.future())).ifPresentOrElse(future -> {
            future.onComplete(promise);
        }, () -> {
            ((Future) supplier.get()).onSuccess(tenantResult -> {
                addResultToCache(annotatedCacheKey, tenantResult);
            }).onComplete(asyncResult -> {
                this.pendingRequests.remove(annotatedCacheKey);
            }).onComplete(promise);
        });
        return promise.future();
    }

    private void addResultToCache(AnnotatedCacheKey<?> annotatedCacheKey, TenantResult<TenantObject> tenantResult) {
        if (isCachingEnabled()) {
            if (tenantResult.getPayload() != null) {
                annotatedCacheKey.putAttribute(ATTRIBUTE_KEY_TENANT_ID, ((TenantObject) tenantResult.getPayload()).getTenantId());
            }
            addToCache(annotatedCacheKey, tenantResult);
        }
    }

    private void removeResultFromCache(String str) {
        removeFromCacheByPattern(obj -> {
            return ((Boolean) ((AnnotatedCacheKey) obj).getAttribute(ATTRIBUTE_KEY_TENANT_ID).map(str2 -> {
                return Boolean.valueOf(str2.equals(str));
            }).orElse(false)).booleanValue();
        });
    }
}
