package org.apache.flink.state.common;

import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/state/common/PeriodicMaterializationManager.class */
public class PeriodicMaterializationManager implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class);
    private final MailboxExecutor mailboxExecutor;
    private final ExecutorService asyncOperationsThreadPool;
    private final ScheduledExecutorService periodicExecutor;
    private final AsyncExceptionHandler asyncExceptionHandler;
    private final String subtaskName;
    private final long periodicMaterializeDelay;
    private final int allowedNumberOfFailures;
    private final AtomicInteger numberOfConsecutiveFailures;
    private final MaterializationTarget target;
    private final ChangelogMaterializationMetricGroup metrics;
    private boolean started;
    private final long initialDelay;

    /* loaded from: input_file:org/apache/flink/state/common/PeriodicMaterializationManager$MaterializationRunnable.class */
    public static class MaterializationRunnable {
        private final RunnableFuture<SnapshotResult<KeyedStateHandle>> materializationRunnable;
        private final long materializationID;
        private final SequenceNumber materializedTo;

        public MaterializationRunnable(RunnableFuture<SnapshotResult<KeyedStateHandle>> runnableFuture, long j, SequenceNumber sequenceNumber) {
            this.materializationRunnable = runnableFuture;
            this.materializedTo = sequenceNumber;
            this.materializationID = j;
        }

        RunnableFuture<SnapshotResult<KeyedStateHandle>> getMaterializationRunnable() {
            return this.materializationRunnable;
        }

        public SequenceNumber getMaterializedTo() {
            return this.materializedTo;
        }

        public long getMaterializationID() {
            return this.materializationID;
        }
    }

    @NotThreadSafe
    /* loaded from: input_file:org/apache/flink/state/common/PeriodicMaterializationManager$MaterializationTarget.class */
    public interface MaterializationTarget {
        public static final MaterializationTarget NO_OP = new MaterializationTarget() { // from class: org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationTarget.1
            @Override // org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationTarget
            public Optional<MaterializationRunnable> initMaterialization() {
                return Optional.empty();
            }

            @Override // org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationTarget
            public void handleMaterializationResult(SnapshotResult<KeyedStateHandle> snapshotResult, long j, SequenceNumber sequenceNumber) {
            }

            @Override // org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationTarget
            public void handleMaterializationFailureOrCancellation(long j, SequenceNumber sequenceNumber, Throwable th) {
            }
        };

        Optional<MaterializationRunnable> initMaterialization() throws Exception;

        void handleMaterializationResult(SnapshotResult<KeyedStateHandle> snapshotResult, long j, SequenceNumber sequenceNumber) throws Exception;

        void handleMaterializationFailureOrCancellation(long j, SequenceNumber sequenceNumber, Throwable th);
    }

    public PeriodicMaterializationManager(MailboxExecutor mailboxExecutor, ExecutorService executorService, String str, AsyncExceptionHandler asyncExceptionHandler, MaterializationTarget materializationTarget, ChangelogMaterializationMetricGroup changelogMaterializationMetricGroup, long j, int i, String str2) {
        this(mailboxExecutor, executorService, str, asyncExceptionHandler, materializationTarget, changelogMaterializationMetricGroup, j, i, str2, Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("periodic-materialization-scheduler-" + str)));
    }

    PeriodicMaterializationManager(MailboxExecutor mailboxExecutor, ExecutorService executorService, String str, AsyncExceptionHandler asyncExceptionHandler, MaterializationTarget materializationTarget, ChangelogMaterializationMetricGroup changelogMaterializationMetricGroup, long j, int i, String str2, ScheduledExecutorService scheduledExecutorService) {
        this.started = false;
        this.mailboxExecutor = (MailboxExecutor) Preconditions.checkNotNull(mailboxExecutor);
        this.asyncOperationsThreadPool = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.subtaskName = (String) Preconditions.checkNotNull(str);
        this.asyncExceptionHandler = (AsyncExceptionHandler) Preconditions.checkNotNull(asyncExceptionHandler);
        this.metrics = changelogMaterializationMetricGroup;
        this.target = (MaterializationTarget) Preconditions.checkNotNull(materializationTarget);
        this.periodicMaterializeDelay = j;
        this.allowedNumberOfFailures = i;
        this.numberOfConsecutiveFailures = new AtomicInteger(0);
        this.periodicExecutor = scheduledExecutorService;
        this.initialDelay = MathUtils.murmurHash(str2.hashCode()) % j;
    }

    public void start() {
        if (this.started || this.periodicMaterializeDelay < 0) {
            return;
        }
        this.started = true;
        LOG.info("Task {} starts periodic materialization", this.subtaskName);
        scheduleNextMaterialization(this.initialDelay);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        LOG.info("Shutting down PeriodicMaterializationManager.");
        if (this.periodicExecutor.isShutdown()) {
            return;
        }
        this.periodicExecutor.shutdownNow();
    }

    @VisibleForTesting
    public void triggerMaterialization() {
        this.mailboxExecutor.execute(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            this.metrics.reportStartedMaterialization();
            try {
                Optional<MaterializationRunnable> initMaterialization = this.target.initMaterialization();
                if (initMaterialization.isPresent()) {
                    MaterializationRunnable materializationRunnable = initMaterialization.get();
                    this.asyncOperationsThreadPool.execute(() -> {
                        asyncMaterializationPhase(currentTimeMillis, materializationRunnable.getMaterializationRunnable(), materializationRunnable.getMaterializationID(), materializationRunnable.getMaterializedTo());
                    });
                } else {
                    this.metrics.reportCompletedMaterialization(System.currentTimeMillis() - currentTimeMillis);
                    scheduleNextMaterialization();
                    LOG.info("Task {} has no state updates since last materialization, skip this one and schedule the next one in {} seconds", this.subtaskName, Long.valueOf(this.periodicMaterializeDelay / 1000));
                }
            } catch (Exception e) {
                this.metrics.reportFailedMaterialization();
                throw e;
            }
        }, "materialization");
    }

    private void asyncMaterializationPhase(long j, RunnableFuture<SnapshotResult<KeyedStateHandle>> runnableFuture, long j2, SequenceNumber sequenceNumber) {
        uploadSnapshot(runnableFuture).whenComplete((snapshotResult, th) -> {
            if (th == null) {
                this.numberOfConsecutiveFailures.set(0);
                this.mailboxExecutor.execute(() -> {
                    try {
                        this.target.handleMaterializationResult(snapshotResult, j2, sequenceNumber);
                        this.metrics.reportCompletedMaterialization(System.currentTimeMillis() - j);
                    } catch (Exception e) {
                        this.metrics.reportFailedMaterialization();
                    }
                }, "Task {} update materializedSnapshot up to changelog sequence number: {}", new Object[]{this.subtaskName, sequenceNumber});
                scheduleNextMaterialization();
            } else {
                if (th instanceof CancellationException) {
                    LOG.info("materialization cancelled", th);
                    notifyFailureOrCancellation(j2, sequenceNumber, th);
                    scheduleNextMaterialization();
                    return;
                }
                notifyFailureOrCancellation(j2, sequenceNumber, th);
                this.metrics.reportFailedMaterialization();
                int incrementAndGet = this.numberOfConsecutiveFailures.incrementAndGet();
                if (incrementAndGet > this.allowedNumberOfFailures) {
                    this.asyncExceptionHandler.handleAsyncException("Task " + this.subtaskName + " fails to complete the asynchronous part of materialization", th);
                } else {
                    LOG.info("Task {} asynchronous part of materialization is not completed for the {} time.", new Object[]{this.subtaskName, Integer.valueOf(incrementAndGet), th});
                    scheduleNextMaterialization();
                }
            }
        });
    }

    private void notifyFailureOrCancellation(long j, SequenceNumber sequenceNumber, Throwable th) {
        this.mailboxExecutor.execute(() -> {
            this.target.handleMaterializationFailureOrCancellation(j, sequenceNumber, th);
        }, "Task {} materialization:{},upTo:{} failed or canceled.", new Object[]{this.subtaskName, Long.valueOf(j), sequenceNumber});
    }

    private CompletableFuture<SnapshotResult<KeyedStateHandle>> uploadSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> runnableFuture) {
        FileSystemSafetyNet.initializeSafetyNetForThread();
        CompletableFuture<SnapshotResult<KeyedStateHandle>> completableFuture = new CompletableFuture<>();
        try {
            try {
                FutureUtils.runIfNotDoneAndGet(runnableFuture);
                LOG.debug("Task {} finishes asynchronous part of materialization.", this.subtaskName);
                completableFuture.complete(runnableFuture.get());
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
                discardFailedUploads(runnableFuture);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
            return completableFuture;
        } catch (Throwable th) {
            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            throw th;
        }
    }

    private void discardFailedUploads(RunnableFuture<SnapshotResult<KeyedStateHandle>> runnableFuture) {
        LOG.info("Task {} cleanup asynchronous runnable for materialization.", this.subtaskName);
        if (runnableFuture == null || runnableFuture.cancel(true)) {
            return;
        }
        try {
            StateObject stateObject = runnableFuture.get();
            if (stateObject != null) {
                stateObject.discardState();
            }
        } catch (Exception e) {
            LOG.debug("Task " + this.subtaskName + " cancelled execution of snapshot future runnable. Cancellation produced the following exception, which is expected and can be ignored.", e);
        }
    }

    private void scheduleNextMaterialization() {
        scheduleNextMaterialization(this.periodicMaterializeDelay);
    }

    private synchronized void scheduleNextMaterialization(long j) {
        if (!this.started || this.periodicExecutor.isShutdown()) {
            return;
        }
        LOG.info("Task {} schedules the next materialization in {} seconds", this.subtaskName, Long.valueOf(j / 1000));
        this.periodicExecutor.schedule(this::triggerMaterialization, j, TimeUnit.MILLISECONDS);
    }
}
