package org.jfrog.rtfs;

import com.google.common.annotations.VisibleForTesting;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import lombok.Generated;
import org.jfrog.rtfs.common.rtfs.db.dao.EventStreamDao;
import org.jfrog.rtfs.common.rtfs.db.dao.EventsDao;
import org.jfrog.rtfs.common.rtfs.db.dao.EventsTmpDao;
import org.jfrog.rtfs.common.rtfs.db.dao.InvalidJdbcResponseException;
import org.jfrog.rtfs.common.rtfs.db.dao.MessageBlobsDao;
import org.jfrog.rtfs.common.rtfs.db.dao.StreamOutOfSyncException;
import org.jfrog.rtfs.common.rtfs.members.RepoTenantId;
import org.jfrog.rtfs.common.rtfs.streams.events.ArtifactEvent;
import org.jfrog.rtfs.common.rtfs.streams.events.EventsException;
import org.jfrog.rtfs.common.rtfs.streams.state.StreamId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:org/jfrog/rtfs/StreamServiceImpl.class */
public class StreamServiceImpl implements StreamService {
    public static final String SNAPSHOT_MARKER = "NOOP";
    private final EventsDao eventsDao;
    private final EventStreamDao streamDao;
    private final EventsTmpDao eventsTmpDao;
    private final MessageBlobsDao messageBlobsDao;
    private final Boolean loadsUnsynced;
    private final EventsFilter eventsFilter = new EventsFilter();
    private Duration initRetryTime = DEFAULT_INIT_RETRY_TIME;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamServiceImpl.class);
    private static final Duration DEFAULT_INIT_RETRY_TIME = Duration.ofSeconds(2);

    @Override // org.jfrog.rtfs.StreamService
    @Transactional
    public List<ArtifactEvent> loadEvents(StreamId streamId, int i) throws EventsException, StreamOutOfSyncException {
        try {
            return this.eventsFilter.filterAndMergeEvents(excludeSnapshotMarkers(this.eventsDao.loadEventBatch(streamId, this.streamDao.loadLastProcessedEventId(streamId), i)));
        } catch (StreamOutOfSyncException e) {
            if (!this.loadsUnsynced.booleanValue()) {
                throw e;
            }
            try {
                log.warn(e.getMessage());
                this.streamDao.tryInitStream(streamId, Instant.EPOCH);
                return this.eventsDao.loadUnsynced(streamId);
            } catch (SQLException e2) {
                String str = "Failed loading unsynced event for " + streamId;
                throw new EventsException(str, str, e2);
            }
        } catch (SQLException e3) {
            String str2 = "Failed loading events for " + streamId;
            throw new EventsException(str2, str2, e3);
        }
    }

    private List<ArtifactEvent> excludeSnapshotMarkers(List<ArtifactEvent> list) {
        return list.stream().filter(artifactEvent -> {
            return !artifactEvent.getAction().equals(SNAPSHOT_MARKER);
        }).toList();
    }

    @Override // org.jfrog.rtfs.StreamService
    public void insertSnapshotMarker(RepoTenantId repoTenantId, Instant instant) throws EventsException {
        try {
            this.eventsTmpDao.insert(repoTenantId.tenantId(), List.of(ArtifactEvent.builder().action(SNAPSHOT_MARKER).path("").repoTenantId(repoTenantId).eventTime(instant).blobId((Long) this.messageBlobsDao.insert(repoTenantId.tenantId(), List.of(new byte[0])).get(0)).build()));
        } catch (SQLException | InvalidJdbcResponseException e) {
            throw new EventsException("Unexpected error creating a marker for " + repoTenantId + ": " + e.getMessage(), "Unexpected error creating a marker for " + repoTenantId, e);
        }
    }

    @Override // org.jfrog.rtfs.StreamService
    public void initStream(StreamId streamId, Instant instant) throws EventsException {
        for (int i = 0; i < 5; i++) {
            try {
                if (this.streamDao.tryInitStream(streamId, instant)) {
                    return;
                }
                if (!this.eventsTmpDao.hasEventsBeforeSnapshot(streamId.repoTenantId(), instant) && !this.eventsDao.hasEventsBeforeSnapshot(streamId.repoTenantId(), instant)) {
                    throw new EventsException("No events found in the stream, can't initialize it.", "Failed setting up event stream");
                }
                Thread.sleep(this.initRetryTime.toMillis());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new EventsException("Init stream was interrupted.", "Failed setting up event stream", e);
            } catch (SQLException e2) {
                throw new EventsException("Internal error initializing a stream: " + e2.getMessage(), "Failed setting up event stream", e2);
            }
        }
        throw new EventsException("Recent events were not moved from temporary table in a reasonable time.", "Failed setting up event stream");
    }

    @Override // org.jfrog.rtfs.StreamService
    public void ack(StreamId streamId, long j) throws EventsException {
        try {
            this.streamDao.updateLastProcessedEventId(streamId, j);
        } catch (SQLException e) {
            String str = "Failed updating last processed event for " + streamId;
            throw new EventsException(str, str, e);
        }
    }

    @Override // org.jfrog.rtfs.StreamService
    public Optional<Instant> getNextEventTime(StreamId streamId) throws EventsException {
        try {
            return this.streamDao.loadNextEventTime(streamId);
        } catch (SQLException | StreamOutOfSyncException e) {
            throw new EventsException("Failed finding next event for " + streamId, e.getMessage(), e);
        }
    }

    @VisibleForTesting
    void setInitRetryTime(Duration duration) {
        this.initRetryTime = duration;
    }

    @Generated
    public StreamServiceImpl(EventsDao eventsDao, EventStreamDao eventStreamDao, EventsTmpDao eventsTmpDao, MessageBlobsDao messageBlobsDao, Boolean bool) {
        this.eventsDao = eventsDao;
        this.streamDao = eventStreamDao;
        this.eventsTmpDao = eventsTmpDao;
        this.messageBlobsDao = messageBlobsDao;
        this.loadsUnsynced = bool;
    }
}
