package io.pravega.segmentstore.server.host;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.cluster.Host;
import io.pravega.common.cluster.HostContainerMap;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.CollectionHelpers;
import io.pravega.segmentstore.server.ContainerHandle;
import io.pravega.segmentstore.server.SegmentContainerRegistry;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/host/ZKSegmentContainerMonitor.class */
public class ZKSegmentContainerMonitor implements AutoCloseable {
    private final Host host;
    private final NodeCache hostContainerMapNode;
    private final SegmentContainerRegistry registry;
    private final Map<Integer, ContainerHandle> handles;
    private final Set<Integer> pendingTasks;
    private final ScheduledExecutorService executor;
    private AtomicReference<ScheduledFuture<?>> assigmentTask;
    private final AtomicLong lastReportTime;

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ZKSegmentContainerMonitor.class);
    private static final Duration INIT_TIMEOUT_PER_CONTAINER = Duration.ofSeconds(30);
    private static final Duration CLOSE_TIMEOUT_PER_CONTAINER = Duration.ofSeconds(30);
    private static final Duration MONITOR_INTERVAL = Duration.ofSeconds(10);
    private static final long REPORT_INTERVAL_MILLIS = Duration.ofMinutes(10).toMillis();
    private static final Supplier<Long> CURRENT_TIME_MILLIS = System::currentTimeMillis;

    @SuppressFBWarnings(justification = "generated code")
    private final Object $lock = new Object[0];
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZKSegmentContainerMonitor(SegmentContainerRegistry segmentContainerRegistry, CuratorFramework curatorFramework, Host host, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(curatorFramework, "zkClient");
        this.registry = (SegmentContainerRegistry) Preconditions.checkNotNull(segmentContainerRegistry, "containerRegistry");
        this.host = (Host) Preconditions.checkNotNull(host, "pravegaServiceEndpoint");
        this.executor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "executor");
        this.handles = new ConcurrentHashMap();
        this.pendingTasks = new ConcurrentSkipListSet();
        this.hostContainerMapNode = new NodeCache(curatorFramework, ZKPaths.makePath("cluster", "segmentContainerHostMapping"));
        this.assigmentTask = new AtomicReference<>();
        this.lastReportTime = new AtomicLong(CURRENT_TIME_MILLIS.get().longValue());
    }

    public void initialize() {
        initialize(MONITOR_INTERVAL);
    }

    @VisibleForTesting
    public void initialize(Duration duration) {
        try {
            Exceptions.checkNotClosed(this.closed.get(), this);
            this.hostContainerMapNode.start();
            this.assigmentTask.set(this.executor.scheduleWithFixedDelay(this::checkAssignment, 0L, duration.getSeconds(), TimeUnit.SECONDS));
            this.hostContainerMapNode.getListenable().addListener(this::checkAssignment, this.executor);
        } catch (Exception e) {
            throw e;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Preconditions.checkState(this.closed.compareAndSet(false, true));
        try {
            this.hostContainerMapNode.close();
        } catch (IOException e) {
            log.warn("Failed to close hostContainerMapNode", e);
        }
        ScheduledFuture<?> andSet = this.assigmentTask.getAndSet(null);
        if (andSet != null) {
            andSet.cancel(true);
        }
        ArrayList arrayList = new ArrayList(this.handles.values());
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ContainerHandle containerHandle = (ContainerHandle) it.next();
            arrayList2.add(this.registry.stopContainer(containerHandle, CLOSE_TIMEOUT_PER_CONTAINER).thenAccept(r5 -> {
                unregisterHandle(containerHandle.getContainerId());
            }));
        }
        Futures.await(Futures.allOf(arrayList2), CLOSE_TIMEOUT_PER_CONTAINER.toMillis());
    }

    @VisibleForTesting
    Collection<Integer> getRegisteredContainers() {
        return this.handles.keySet();
    }

    private void checkAssignment() {
        synchronized (this.$lock) {
            long traceEnter = LoggerHelpers.traceEnter(log, "checkAssignment", new Object[0]);
            try {
                try {
                    Exceptions.checkNotClosed(this.closed.get(), this);
                    Set<Integer> desiredContainerList = getDesiredContainerList();
                    if (desiredContainerList != null) {
                        HashSet hashSet = new HashSet(this.handles.keySet());
                        HashSet hashSet2 = new HashSet(this.pendingTasks);
                        Collection filterOut = CollectionHelpers.filterOut(CollectionHelpers.filterOut(desiredContainerList, hashSet), hashSet2);
                        Collection filterOut2 = CollectionHelpers.filterOut(CollectionHelpers.filterOut(hashSet, desiredContainerList), hashSet2);
                        if (((hashSet2.isEmpty() && filterOut.isEmpty() && filterOut2.isEmpty()) ? false : true) || CURRENT_TIME_MILLIS.get().longValue() - this.lastReportTime.get() >= REPORT_INTERVAL_MILLIS) {
                            log.info("Container Changes: Desired = {}, Current = {}, PendingTasks = {}, ToStart = {}, ToStop = {}.", new Object[]{desiredContainerList, hashSet, hashSet2, filterOut, filterOut2});
                            this.lastReportTime.set(CURRENT_TIME_MILLIS.get().longValue());
                        }
                        filterOut.forEach((v1) -> {
                            startContainer(v1);
                        });
                        filterOut2.forEach((v1) -> {
                            stopContainer(v1);
                        });
                    } else {
                        log.warn("No segment container assignments found");
                    }
                    LoggerHelpers.traceLeave(log, "checkAssignment", traceEnter, new Object[0]);
                } catch (Throwable th) {
                    LoggerHelpers.traceLeave(log, "checkAssignment", traceEnter, new Object[0]);
                    throw th;
                }
            } catch (Throwable th2) {
                log.warn("Failed to monitor the segmentcontainer assignment: ", th2);
                LoggerHelpers.traceLeave(log, "checkAssignment", traceEnter, new Object[0]);
            }
        }
    }

    private CompletableFuture<Void> stopContainer(int i) {
        log.info("Stopping Container {}.", Integer.valueOf(i));
        ContainerHandle containerHandle = this.handles.get(Integer.valueOf(i));
        if (containerHandle == null) {
            log.warn("Container {} handle is null, container is pending start or already unregistered.", Integer.valueOf(i));
            return null;
        }
        this.pendingTasks.add(Integer.valueOf(i));
        try {
            return this.registry.stopContainer(containerHandle, CLOSE_TIMEOUT_PER_CONTAINER).whenComplete((r7, th) -> {
                if (th != null) {
                    log.warn("Stopping container {} failed: {}", Integer.valueOf(i), th);
                }
                try {
                    unregisterHandle(i);
                    this.pendingTasks.remove(Integer.valueOf(i));
                } catch (Throwable th) {
                    this.pendingTasks.remove(Integer.valueOf(i));
                    throw th;
                }
            });
        } catch (Throwable th2) {
            this.pendingTasks.remove(Integer.valueOf(i));
            throw th2;
        }
    }

    private CompletableFuture<ContainerHandle> startContainer(int i) {
        log.info("Starting Container {}.", Integer.valueOf(i));
        this.pendingTasks.add(Integer.valueOf(i));
        try {
            return this.registry.startContainer(i, INIT_TIMEOUT_PER_CONTAINER).whenComplete((containerHandle, th) -> {
                try {
                    if (th != null) {
                        log.warn("Starting container {} failed: {}", Integer.valueOf(i), th);
                    } else if (this.handles.putIfAbsent(Integer.valueOf(containerHandle.getContainerId()), containerHandle) != null) {
                        log.warn("Starting container {} succeeded but handle is already registered.", Integer.valueOf(containerHandle.getContainerId()));
                    } else {
                        containerHandle.setContainerStoppedListener((v1) -> {
                            unregisterHandle(v1);
                        });
                        log.info("Container {} has been registered.", Integer.valueOf(containerHandle.getContainerId()));
                    }
                    this.pendingTasks.remove(Integer.valueOf(i));
                } catch (Throwable th) {
                    this.pendingTasks.remove(Integer.valueOf(i));
                    throw th;
                }
            });
        } catch (Throwable th2) {
            this.pendingTasks.remove(Integer.valueOf(i));
            throw th2;
        }
    }

    private void unregisterHandle(int i) {
        if (this.handles.remove(Integer.valueOf(i)) == null) {
            log.warn("Attempted to unregister non-registered container {}.", Integer.valueOf(i));
        } else {
            log.info("Container {} has been unregistered.", Integer.valueOf(i));
        }
    }

    private Set<Integer> getDesiredContainerList() {
        byte[] data;
        log.debug("Fetching the latest container assignment from ZooKeeper.");
        if (this.hostContainerMapNode.getCurrentData() == null || (data = this.hostContainerMapNode.getCurrentData().getData()) == null) {
            return null;
        }
        return (Set) HostContainerMap.fromBytes(data).getHostContainerMap().entrySet().stream().filter(entry -> {
            return ((Host) entry.getKey()).equals(this.host);
        }).map((v0) -> {
            return v0.getValue();
        }).findFirst().orElse(Collections.emptySet());
    }
}
