package io.atomix.protocols.gossip;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.event.AbstractListenerManager;
import io.atomix.protocols.gossip.GossipService;
import io.atomix.protocols.gossip.protocol.AntiEntropyAdvertisement;
import io.atomix.protocols.gossip.protocol.AntiEntropyProtocol;
import io.atomix.protocols.gossip.protocol.AntiEntropyResponse;
import io.atomix.protocols.gossip.protocol.GossipMessage;
import io.atomix.protocols.gossip.protocol.GossipUpdate;
import io.atomix.time.LogicalClock;
import io.atomix.utils.AbstractAccumulator;
import io.atomix.utils.Identifier;
import io.atomix.utils.SlidingWindowCounter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/protocols/gossip/AntiEntropyService.class */
public class AntiEntropyService<K, V> extends AbstractListenerManager<GossipEvent<K, V>, GossipEventListener<K, V>> implements GossipService<K, V> {
    private static final int WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 2;
    private static final int LOAD_WINDOW = 2;
    private final AntiEntropyProtocol<Identifier> protocol;
    private final Supplier<Collection<Identifier>> peerProvider;
    private final Executor eventExecutor;
    private final Executor communicationExecutor;
    private final boolean tombstonesDisabled;
    private final ScheduledFuture<?> updateFuture;
    private final ScheduledFuture<?> purgeFuture;
    private static final int DEFAULT_MAX_EVENTS = 1000;
    private static final int DEFAULT_MAX_IDLE_MS = 10;
    private static final int DEFAULT_MAX_BATCH_MS = 50;
    private static final Timer TIMER = new Timer("onos-ecm-sender-events");
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Map<K, GossipUpdate<K, V>> updates = Maps.newLinkedHashMap();
    private final LogicalClock logicalClock = new LogicalClock();
    private final Map<Identifier, AntiEntropyService<K, V>.UpdateAccumulator> pendingUpdates = Maps.newConcurrentMap();
    private final Map<Identifier, Long> peerUpdateTimes = Maps.newConcurrentMap();
    private volatile boolean open = true;
    private final SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);

    /* loaded from: input_file:io/atomix/protocols/gossip/AntiEntropyService$Builder.class */
    public static class Builder<K, V> implements GossipService.Builder<K, V> {
        protected AntiEntropyProtocol protocol;
        protected Supplier<Collection<Identifier>> peerProvider;
        protected ScheduledExecutorService communicationExecutor;
        protected Executor eventExecutor = MoreExecutors.directExecutor();
        protected Duration antiEntropyInterval = Duration.ofSeconds(1);
        protected boolean tombstonesDisabled = false;
        protected Duration purgeInterval = Duration.ofMinutes(1);

        public Builder<K, V> withProtocol(AntiEntropyProtocol antiEntropyProtocol) {
            this.protocol = (AntiEntropyProtocol) Preconditions.checkNotNull(antiEntropyProtocol, "protocol");
            return this;
        }

        public Builder<K, V> withPeerProvider(Supplier<Collection<Identifier>> supplier) {
            this.peerProvider = (Supplier) Preconditions.checkNotNull(supplier, "peerProvider cannot be null");
            return this;
        }

        public Builder<K, V> withEventExecutor(Executor executor) {
            this.eventExecutor = (Executor) Preconditions.checkNotNull(executor, "executor cannot be null");
            return this;
        }

        public Builder<K, V> withCommunicationExecutor(ScheduledExecutorService scheduledExecutorService) {
            this.communicationExecutor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "executor cannot be null");
            return this;
        }

        public Builder<K, V> withAntiEntropyInterval(Duration duration) {
            this.antiEntropyInterval = (Duration) Preconditions.checkNotNull(duration, "antiEntropyInterval cannot be null");
            return this;
        }

        public Builder<K, V> withTombstonesDisabled(boolean z) {
            this.tombstonesDisabled = z;
            return this;
        }

        public Builder<K, V> withPurgeInterval(Duration duration) {
            this.purgeInterval = (Duration) Preconditions.checkNotNull(duration, "purgeInterval cannot be null");
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public GossipService<K, V> m1build() {
            return new AntiEntropyService(this.protocol, this.peerProvider, this.eventExecutor, this.communicationExecutor, this.antiEntropyInterval, this.tombstonesDisabled, this.purgeInterval);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/protocols/gossip/AntiEntropyService$UpdateAccumulator.class */
    public final class UpdateAccumulator extends AbstractAccumulator<GossipUpdate<K, V>> {
        private final Identifier peer;

        private UpdateAccumulator(Identifier identifier) {
            super(AntiEntropyService.TIMER, AntiEntropyService.DEFAULT_MAX_EVENTS, AntiEntropyService.DEFAULT_MAX_BATCH_MS, AntiEntropyService.DEFAULT_MAX_IDLE_MS);
            this.peer = identifier;
        }

        public void processItems(List<GossipUpdate<K, V>> list) {
            HashMap newHashMap = Maps.newHashMap();
            list.forEach(gossipUpdate -> {
            });
            AntiEntropyService.this.communicationExecutor.execute(() -> {
                try {
                    AntiEntropyService.this.protocol.gossip(this.peer, new GossipMessage<>(AntiEntropyService.this.logicalClock.increment(), newHashMap.values()));
                } catch (Exception e) {
                    AntiEntropyService.this.log.warn("Failed to send to {}", this.peer, e);
                }
            });
        }
    }

    public AntiEntropyService(AntiEntropyProtocol<Identifier> antiEntropyProtocol, Supplier<Collection<Identifier>> supplier, Executor executor, ScheduledExecutorService scheduledExecutorService, Duration duration, boolean z, Duration duration2) {
        this.protocol = (AntiEntropyProtocol) Preconditions.checkNotNull(antiEntropyProtocol, "protocol cannot be null");
        this.peerProvider = (Supplier) Preconditions.checkNotNull(supplier, "peerProvider cannot be null");
        this.eventExecutor = (Executor) Preconditions.checkNotNull(executor, "eventExecutor cannot be null");
        this.communicationExecutor = (Executor) Preconditions.checkNotNull(scheduledExecutorService, "communicationExecutor cannot be null");
        this.tombstonesDisabled = z;
        antiEntropyProtocol.registerGossipListener(this::update);
        this.updateFuture = scheduledExecutorService.scheduleAtFixedRate(this::performAntiEntropy, 0L, duration.toMillis(), TimeUnit.MILLISECONDS);
        this.purgeFuture = !z ? scheduledExecutorService.scheduleAtFixedRate(this::purgeTombstones, 0L, duration2.toMillis(), TimeUnit.MILLISECONDS) : null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void post(GossipEvent<K, V> gossipEvent) {
        this.eventExecutor.execute(() -> {
            super.post(gossipEvent);
        });
    }

    public void process(GossipEvent<K, V> gossipEvent) {
        GossipUpdate<K, V> gossipUpdate = new GossipUpdate<>(gossipEvent.subject(), gossipEvent.value(), this.logicalClock.increment());
        if (!gossipUpdate.isTombstone()) {
            this.updates.put(gossipEvent.subject(), gossipUpdate);
            notifyPeers(gossipUpdate);
        } else if (this.tombstonesDisabled) {
            this.updates.remove(gossipUpdate.subject());
        } else {
            this.updates.put(gossipEvent.subject(), gossipUpdate);
            notifyPeers(gossipUpdate);
        }
        post((GossipEvent) gossipEvent);
    }

    private synchronized void update(GossipMessage<K, V> gossipMessage) {
        this.logicalClock.update(gossipMessage.timestamp());
        for (GossipUpdate<K, V> gossipUpdate : gossipMessage.updates()) {
            GossipUpdate<K, V> gossipUpdate2 = this.updates.get(gossipUpdate.subject());
            if (gossipUpdate2 == null || ((gossipUpdate2.isTombstone() && !gossipUpdate.isTombstone()) || gossipUpdate2.timestamp().isOlderThan(gossipUpdate.timestamp()))) {
                if (!this.tombstonesDisabled) {
                    this.updates.put(gossipUpdate.subject(), gossipUpdate);
                }
                post((GossipEvent) new GossipEvent<>(gossipUpdate.creationTime(), gossipUpdate.subject(), gossipUpdate.value()));
            }
        }
    }

    private AntiEntropyService<K, V>.UpdateAccumulator getAccumulator(Identifier identifier) {
        return this.pendingUpdates.computeIfAbsent(identifier, identifier2 -> {
            return new UpdateAccumulator(identifier2);
        });
    }

    private boolean underHighLoad() {
        return this.counter.get(2) > 2;
    }

    private void performAntiEntropy() {
        try {
            if (underHighLoad() || !this.open) {
                return;
            }
            pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
        } catch (Exception e) {
            this.log.error("Exception thrown while sending advertisement", e);
        }
    }

    private Optional<Identifier> pickRandomActivePeer() {
        ArrayList newArrayList = Lists.newArrayList(this.peerProvider.get());
        Collections.shuffle(newArrayList);
        return newArrayList.isEmpty() ? Optional.empty() : Optional.of(newArrayList.get(0));
    }

    private void sendAdvertisementToPeer(Identifier identifier) {
        long currentTimeMillis = System.currentTimeMillis();
        this.protocol.advertise(identifier, new AntiEntropyAdvertisement<>(ImmutableMap.copyOf(Maps.transformValues(this.updates, (v0) -> {
            return v0.digest();
        })))).whenComplete((antiEntropyResponse, th) -> {
            if (th != null) {
                this.log.debug("Failed to send anti-entropy advertisement to {}: {}", identifier, th.getMessage());
                return;
            }
            if (antiEntropyResponse.status() == AntiEntropyResponse.Status.PROCESSED) {
                if (!antiEntropyResponse.keys().isEmpty()) {
                    AntiEntropyService<K, V>.UpdateAccumulator accumulator = getAccumulator(identifier);
                    Iterator<K> it = antiEntropyResponse.keys().iterator();
                    while (it.hasNext()) {
                        GossipUpdate<K, V> gossipUpdate = this.updates.get(it.next());
                        if (gossipUpdate != null) {
                            accumulator.add(gossipUpdate);
                        }
                    }
                }
                this.peerUpdateTimes.put(identifier, Long.valueOf(currentTimeMillis));
            }
        });
    }

    private void notifyPeers(GossipUpdate<K, V> gossipUpdate) {
        notifyPeers(gossipUpdate, this.peerProvider.get());
    }

    private void notifyPeers(GossipUpdate<K, V> gossipUpdate, Collection<Identifier> collection) {
        queueUpdate(gossipUpdate, collection);
    }

    private void queueUpdate(GossipUpdate<K, V> gossipUpdate, Collection<Identifier> collection) {
        if (collection != null) {
            Iterator<Identifier> it = collection.iterator();
            while (it.hasNext()) {
                getAccumulator(it.next()).add(gossipUpdate);
            }
        }
    }

    private synchronized void purgeTombstones() {
        long longValue = ((Long) this.peerProvider.get().stream().map(identifier -> {
            return this.peerUpdateTimes.getOrDefault(identifier, 0L);
        }).reduce((v0, v1) -> {
            return Math.min(v0, v1);
        }).orElse(0L)).longValue();
        Iterator<Map.Entry<K, GossipUpdate<K, V>>> it = this.updates.entrySet().iterator();
        while (it.hasNext()) {
            GossipUpdate<K, V> value = it.next().getValue();
            if (value.isTombstone() && value.creationTime() < longValue) {
                it.remove();
            }
        }
    }

    @Override // io.atomix.protocols.gossip.GossipService
    public void close() {
        this.open = false;
        this.protocol.unregisterGossipListener();
        this.updateFuture.cancel(false);
        if (this.purgeFuture != null) {
            this.purgeFuture.cancel(false);
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("protocol", this.protocol).toString();
    }
}
