package io.atomix.coordination;

import io.atomix.catalyst.util.Listener;
import io.atomix.coordination.state.TopicCommands;
import io.atomix.coordination.state.TopicState;
import io.atomix.copycat.client.RaftClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.Consistency;
import io.atomix.resource.ResourceInfo;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

@ResourceInfo(stateMachine = TopicState.class)
/* loaded from: input_file:io/atomix/coordination/DistributedTopic.class */
public class DistributedTopic<T> extends AbstractResource {
    private final Set<Consumer<T>> listeners;

    /* loaded from: input_file:io/atomix/coordination/DistributedTopic$TopicListener.class */
    private class TopicListener implements Listener<T> {
        private final Consumer<T> listener;

        private TopicListener(Consumer<T> consumer) {
            this.listener = consumer;
        }

        public void accept(T t) {
            this.listener.accept(t);
        }

        public void close() {
            synchronized (DistributedTopic.this) {
                DistributedTopic.this.listeners.remove(this.listener);
                if (DistributedTopic.this.listeners.isEmpty()) {
                    DistributedTopic.this.submit(new TopicCommands.Unlisten());
                }
            }
        }
    }

    public DistributedTopic(RaftClient raftClient) {
        super(raftClient);
        this.listeners = new HashSet();
        raftClient.session().onEvent("message", obj -> {
            Iterator<Consumer<T>> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().accept(obj);
            }
        });
    }

    /* renamed from: with, reason: merged with bridge method [inline-methods] */
    public DistributedTopic<T> m4with(Consistency consistency) {
        super.with(consistency);
        return this;
    }

    public DistributedTopic<T> sync() {
        return m4with(Consistency.ATOMIC);
    }

    public DistributedTopic<T> async() {
        return m4with(Consistency.SEQUENTIAL);
    }

    public CompletableFuture<Void> publish(T t) {
        return submit(new TopicCommands.Publish(t));
    }

    public CompletableFuture<Listener<T>> subscribe(Consumer<T> consumer) {
        if (this.listeners.isEmpty()) {
            this.listeners.add(consumer);
            return submit(new TopicCommands.Listen()).thenApply(r8 -> {
                return new TopicListener(consumer);
            });
        }
        this.listeners.add(consumer);
        return CompletableFuture.completedFuture(new TopicListener(consumer));
    }
}
