package io.zeebe.logstreams.impl;

import io.zeebe.dispatcher.BlockPeek;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.channel.ActorConditions;
import io.zeebe.util.sched.future.ActorFuture;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.MutableDirectBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/impl/LogStorageAppender.class */
public class LogStorageAppender extends Actor {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final String name;
    private final LogStorage logStorage;
    private final Subscription writeBufferSubscription;
    private final ActorConditions logStorageAppendConditions;
    private int maxAppendBlockSize;
    private final AtomicBoolean isFailed = new AtomicBoolean(false);
    private final BlockPeek blockPeek = new BlockPeek();
    private Runnable peekedBlockHandler = this::appendBlock;

    public LogStorageAppender(String str, LogStorage logStorage, Subscription subscription, int i, ActorConditions actorConditions) {
        this.name = str;
        this.logStorage = logStorage;
        this.writeBufferSubscription = subscription;
        this.maxAppendBlockSize = i;
        this.logStorageAppendConditions = actorConditions;
    }

    public String getName() {
        return this.name;
    }

    protected void onActorStarting() {
        this.actor.consume(this.writeBufferSubscription, this::peekBlock);
    }

    private void peekBlock() {
        if (this.writeBufferSubscription.peekBlock(this.blockPeek, this.maxAppendBlockSize, true) > 0) {
            this.peekedBlockHandler.run();
        } else {
            this.actor.yield();
        }
    }

    private void appendBlock() {
        ByteBuffer rawBuffer = this.blockPeek.getRawBuffer();
        MutableDirectBuffer buffer = this.blockPeek.getBuffer();
        if (this.logStorage.append(rawBuffer) >= 0) {
            this.blockPeek.markCompleted();
            this.logStorageAppendConditions.signalConsumers();
            return;
        }
        this.isFailed.set(true);
        LOG.error("Failed to append log storage on position '{}'. Stop writing to log storage until recovered.", Long.valueOf(LogEntryDescriptor.getPosition(buffer, 0)));
        this.peekedBlockHandler = this::discardBlock;
        discardBlock();
    }

    private void discardBlock() {
        this.blockPeek.markFailed();
        this.actor.yield();
    }

    public ActorFuture<Void> close() {
        return this.actor.close();
    }

    public boolean isFailed() {
        return this.isFailed.get();
    }

    public long getCurrentAppenderPosition() {
        return this.writeBufferSubscription.getPosition();
    }
}
