package com.atlassian.cache.hazelcast.asyncinvalidation;

import com.atlassian.cache.hazelcast.asyncinvalidation.Cache;
import com.atlassian.cache.hazelcast.asyncinvalidation.Topic;
import com.hazelcast.core.HazelcastInstance;
import java.io.Serializable;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/atlassian/cache/hazelcast/asyncinvalidation/CacheInvalidatorFactory.class */
public final class CacheInvalidatorFactory implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(CacheInvalidatorFactory.class);
    private final Topics topics;
    private final Observability observability;
    private final SequenceSnapshotReceiver<Cache.Name> sequenceSnapshotReceiver = new SequenceSnapshotReceiver<>();
    private final SequenceSnapshotSender<Cache.Name> sequenceSnapshotSender;

    public static CacheInvalidatorFactory create(HazelcastInstance hazelcastInstance, Function<String, String> function, Observability observability) {
        return new CacheInvalidatorFactory(Topics.from(hazelcastInstance, function), observability).registerSequenceSnapshotReceiver();
    }

    private CacheInvalidatorFactory(Topics topics, Observability observability) {
        this.topics = topics;
        this.observability = observability;
        this.sequenceSnapshotSender = new SequenceSnapshotSender<>(topics.sequenceSnapshot());
    }

    private CacheInvalidatorFactory registerSequenceSnapshotReceiver() {
        Topic sequenceSnapshot = this.topics.sequenceSnapshot();
        SequenceSnapshotReceiver<Cache.Name> sequenceSnapshotReceiver = this.sequenceSnapshotReceiver;
        sequenceSnapshotReceiver.getClass();
        Topic.MessageConsumer messageConsumer = sequenceSnapshotReceiver::processSequenceSnapshot;
        Topics topics = this.topics;
        topics.getClass();
        sequenceSnapshot.addListener(messageConsumer, topics::addRegistration);
        return this;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.topics.close();
        this.sequenceSnapshotSender.close();
        this.sequenceSnapshotReceiver.close();
    }

    public void publishSequenceSnapshot() {
        this.sequenceSnapshotSender.publishSequenceSnapshot();
    }

    public <K extends Serializable, V> CacheInvalidator<K> createCacheInvalidator(com.atlassian.cache.Cache<K, V> cache) {
        return createCacheInvalidator(Cache.from(cache));
    }

    private <K extends Serializable> CacheInvalidator<K> createCacheInvalidator(Cache<K> cache) {
        log.debug("Creating invalidator for cache '{}'", cache.getName());
        Topic<CacheInvalidation<K>> cacheInvalidation = this.topics.cacheInvalidation(cache);
        SequenceTracker<ClusterNode> sequenceTracker = new SequenceTracker<>();
        CacheInvalidationReceiver<K> createCacheInvalidationReceiver = createCacheInvalidationReceiver(cache, sequenceTracker);
        createCacheInvalidationReceiver.getClass();
        Topic.MessageConsumer<CacheInvalidation<K>> messageConsumer = createCacheInvalidationReceiver::processInvalidation;
        Topics topics = this.topics;
        topics.getClass();
        cacheInvalidation.addListener(messageConsumer, topics::addRegistration);
        InvalidationTopicSender invalidationTopicSender = new InvalidationTopicSender();
        SequenceSnapshotSender<Cache.Name> sequenceSnapshotSender = this.sequenceSnapshotSender;
        Cache.Name name = cache.getName();
        invalidationTopicSender.getClass();
        sequenceSnapshotSender.registerSequenceNumberSource(name, invalidationTopicSender::getCurrentSequenceNumber);
        CacheInvalidationSequenceSnapshotVerifier createSnapshotVerifier = createSnapshotVerifier(cache, sequenceTracker);
        SequenceSnapshotReceiver<Cache.Name> sequenceSnapshotReceiver = this.sequenceSnapshotReceiver;
        Cache.Name name2 = cache.getName();
        createSnapshotVerifier.getClass();
        sequenceSnapshotReceiver.registerSequenceSnapshotConsumer(name2, createSnapshotVerifier::verifyMinimumSequenceNumber);
        return invalidationTopicSender.createInvalidator(cacheInvalidation);
    }

    private <K extends Serializable> CacheInvalidationReceiver<K> createCacheInvalidationReceiver(Cache<K> cache, SequenceTracker<ClusterNode> sequenceTracker) {
        Observability observability = this.observability;
        observability.getClass();
        return new CacheInvalidationReceiver<>(cache, sequenceTracker, flushable(cache, observability::cacheInvalidationOutOfSequence));
    }

    private <K extends Serializable> CacheInvalidationSequenceSnapshotVerifier createSnapshotVerifier(Cache<K> cache, SequenceTracker<ClusterNode> sequenceTracker) {
        Cache.Name name = cache.getName();
        Observability observability = this.observability;
        observability.getClass();
        return new CacheInvalidationSequenceSnapshotVerifier(name, flushable(cache, observability::sequenceSnapshotInconsistent), sequenceTracker);
    }

    private <K extends Serializable> Flushable flushable(Cache<K> cache, Consumer<String> consumer) {
        return cache.flushable(name -> {
            consumer.accept(name.toString());
        });
    }
}
