package com.google.cloud.hadoop.gcsio.cooplock;

import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
import com.google.cloud.hadoop.gcsio.FileInfo;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo;
import com.google.cloud.hadoop.gcsio.PathCodec;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.gson.Gson;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:com/google/cloud/hadoop/gcsio/cooplock/CoopLockOperationDao.class */
public class CoopLockOperationDao {
    private static final String OPERATION_LOG_FILE_FORMAT = "%s_%s_%s.log";
    private static final String OPERATION_LOCK_FILE_FORMAT = "%s_%s_%s.lock";
    private GoogleCloudStorage gcs;
    private PathCodec pathCodec;
    private ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private static final Set<String> VALID_OPERATIONS = ImmutableSet.of("delete", "rename");
    private static final CreateObjectOptions CREATE_OBJECT_OPTIONS = new CreateObjectOptions(false, "application/text", CreateObjectOptions.EMPTY_METADATA);
    private static final CreateObjectOptions UPDATE_OBJECT_OPTIONS = new CreateObjectOptions(true, "application/text", CreateObjectOptions.EMPTY_METADATA);
    private static DateTimeFormatter LOCK_FILE_DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss.SSSXXX").withZone(ZoneId.of("UTC"));
    private static final Gson GSON = new Gson();

    public CoopLockOperationDao(GoogleCloudStorage googleCloudStorage, PathCodec pathCodec) {
        this.gcs = googleCloudStorage;
        this.pathCodec = pathCodec;
    }

    public Future<?> persistDeleteOperation(URI uri, List<FileInfo> list, List<FileInfo> list2, String str, StorageResourceId storageResourceId, Future<?> future) throws IOException {
        Instant now = Instant.now();
        URI writeOperationFile = writeOperationFile(uri.getAuthority(), OPERATION_LOCK_FILE_FORMAT, CREATE_OBJECT_OPTIONS, "delete", str, now, ImmutableList.of(GSON.toJson(new DeleteOperation().setLockEpochSeconds(now.getEpochSecond()).setResource(storageResourceId.toString()))));
        writeOperationFile(uri.getAuthority(), OPERATION_LOG_FILE_FORMAT, CREATE_OBJECT_OPTIONS, "delete", str, now, (List) Streams.concat(new Stream[]{list.stream(), list2.stream()}).map(fileInfo -> {
            return fileInfo.getItemInfo().getResourceId().toString();
        }).collect(ImmutableList.toImmutableList()));
        return scheduleLockUpdate(str, writeOperationFile, DeleteOperation.class, (deleteOperation, instant) -> {
            deleteOperation.setLockEpochSeconds(instant.getEpochSecond());
        });
    }

    public Future<?> persistUpdateOperation(FileInfo fileInfo, URI uri, String str, Map<FileInfo, URI> map, Map<FileInfo, URI> map2, Instant instant) throws IOException {
        URI writeOperationFile = writeOperationFile(uri.getAuthority(), OPERATION_LOCK_FILE_FORMAT, CREATE_OBJECT_OPTIONS, "rename", str, instant, ImmutableList.of(GSON.toJson(new RenameOperation().setLockEpochSeconds(instant.getEpochSecond()).setSrcResource(fileInfo.getPath().toString()).setDstResource(uri.toString()).setCopySucceeded(false))));
        writeOperationFile(uri.getAuthority(), OPERATION_LOG_FILE_FORMAT, CREATE_OBJECT_OPTIONS, "rename", str, instant, (List) Streams.concat(new Stream[]{map.entrySet().stream(), map2.entrySet().stream()}).map(entry -> {
            return ((FileInfo) entry.getKey()).getItemInfo().getResourceId() + " -> " + entry.getValue();
        }).collect(ImmutableList.toImmutableList()));
        return scheduleLockUpdate(str, writeOperationFile, RenameOperation.class, (renameOperation, instant2) -> {
            renameOperation.setLockEpochSeconds(instant2.getEpochSecond());
        });
    }

    public void checkpointUpdateOperation(FileInfo fileInfo, URI uri, String str, Instant instant) throws IOException {
        writeOperationFile(uri.getAuthority(), OPERATION_LOCK_FILE_FORMAT, UPDATE_OBJECT_OPTIONS, "rename", str, instant, ImmutableList.of(GSON.toJson(new RenameOperation().setLockEpochSeconds(Instant.now().getEpochSecond()).setSrcResource(fileInfo.getPath().toString()).setDstResource(uri.toString()).setCopySucceeded(true))));
    }

    private void renewLockOrExit(String str, URI uri, Function<String, String> function) {
        for (int i = 0; i < 10; i++) {
            try {
                renewLock(str, uri, function);
            } catch (IOException e) {
                logger.atWarning().withCause(e).log("Failed to renew '%s' lock for %s operation, retry #%d", uri, str, Integer.valueOf(i + 1));
            }
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        }
        logger.atSevere().log("Failed to renew '%s' lock for %s operation, exiting", uri, str);
        System.exit(1);
    }

    private void renewLock(String str, URI uri, Function<String, String> function) throws IOException {
        StorageResourceId fromObjectName = StorageResourceId.fromObjectName(uri.toString());
        GoogleCloudStorageItemInfo itemInfo = this.gcs.getItemInfo(fromObjectName);
        Preconditions.checkState(itemInfo.exists(), "lock file for %s operation should exist", str);
        BufferedReader bufferedReader = new BufferedReader(Channels.newReader(this.gcs.open(fromObjectName), StandardCharsets.UTF_8.name()));
        Throwable th = null;
        try {
            String str2 = (String) bufferedReader.lines().collect(Collectors.joining());
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            String apply = function.apply(str2);
            writeOperation(new StorageResourceId(uri.getAuthority(), uri.getPath(), itemInfo.getContentGeneration()), new CreateObjectOptions(true, "application/octet-stream", CreateObjectOptions.EMPTY_METADATA), ImmutableList.of(apply));
        } catch (Throwable th3) {
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            throw th3;
        }
    }

    private URI writeOperationFile(String str, String str2, CreateObjectOptions createObjectOptions, String str3, String str4, Instant instant, List<String> list) throws IOException {
        Preconditions.checkArgument(VALID_OPERATIONS.contains(str3), "operation must be one of $s, but was '%s'", VALID_OPERATIONS, str3);
        URI path = this.pathCodec.getPath(str, String.format(CoopLockRecordsDao.LOCK_DIRECTORY + str2, LOCK_FILE_DATE_TIME_FORMAT.format(instant), str3, str4), false);
        writeOperation(this.pathCodec.validatePathAndGetId(path, false), createObjectOptions, list);
        return path;
    }

    private void writeOperation(StorageResourceId storageResourceId, CreateObjectOptions createObjectOptions, List<String> list) throws IOException {
        WritableByteChannel create = this.gcs.create(storageResourceId, createObjectOptions);
        Throwable th = null;
        try {
            try {
                Iterator<String> it = list.iterator();
                while (it.hasNext()) {
                    create.write(ByteBuffer.wrap(it.next().getBytes(StandardCharsets.UTF_8)));
                    create.write(ByteBuffer.wrap(new byte[]{10}));
                }
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    public <T> Future<?> scheduleLockUpdate(String str, URI uri, Class<T> cls, BiConsumer<T, Instant> biConsumer) {
        return this.scheduledThreadPool.scheduleAtFixedRate(() -> {
            renewLockOrExit(str, uri, str2 -> {
                Object fromJson = GSON.fromJson(str2, cls);
                biConsumer.accept(fromJson, Instant.now());
                return GSON.toJson(fromJson);
            });
        }, 1L, 1L, TimeUnit.MINUTES);
    }
}
