package org.jfrog.rtfs;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import lombok.Generated;
import org.apache.commons.lang.StringUtils;
import org.jfrog.rtfs.common.ArtifactoryFederationException;
import org.jfrog.rtfs.common.concurrent.LockService;
import org.jfrog.rtfs.common.concurrent.LockableMethods;
import org.jfrog.rtfs.common.data.EventRequest;
import org.jfrog.rtfs.common.exception.LockableMethodException;
import org.jfrog.rtfs.common.rtfs.db.dao.EventsDao;
import org.jfrog.rtfs.common.rtfs.db.dao.EventsTmpDao;
import org.jfrog.rtfs.common.rtfs.db.dao.InvalidJdbcResponseException;
import org.jfrog.rtfs.common.rtfs.db.dao.MessageBlobsDao;
import org.jfrog.rtfs.common.rtfs.db.dao.ShiftEventsDao;
import org.jfrog.rtfs.common.rtfs.streams.events.ArtifactEvent;
import org.jfrog.rtfs.common.rtfs.streams.events.EventsException;
import org.jfrog.rtfs.common.rtfs.streams.events.EventsStorageInitiatorService;
import org.jfrog.rtfs.common.rtfs.streams.events.PartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:org/jfrog/rtfs/EventsStorageInitiatorServiceImpl.class */
public class EventsStorageInitiatorServiceImpl implements EventsStorageInitiatorService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(EventsStorageInitiatorServiceImpl.class);
    private final EventsTmpDao eventsTmpDao;
    private final MessageBlobsDao messageBlobsDao;
    private final ShiftEventsDao shiftEventsDao;
    private final StreamListenerService listenerService;
    private final ApplicationContext applicationContext;
    private final LockService lockService;
    private final EventsDao eventsDao;
    private LoadingCache<String, Boolean> tenantsIds;
    private final W3CTraceContextPropagator w3CTraceContextPropagator = W3CTraceContextPropagator.getInstance();
    private final ObjectMapper mapper = new ObjectMapper();

    public EventsStorageInitiatorServiceImpl(EventsTmpDao eventsTmpDao, MessageBlobsDao messageBlobsDao, ShiftEventsDao shiftEventsDao, StreamListenerService streamListenerService, ApplicationContext applicationContext, LockService lockService, EventsDao eventsDao) {
        this.eventsTmpDao = eventsTmpDao;
        this.messageBlobsDao = messageBlobsDao;
        this.shiftEventsDao = shiftEventsDao;
        this.listenerService = streamListenerService;
        this.applicationContext = applicationContext;
        this.lockService = lockService;
        this.eventsDao = eventsDao;
        initCache();
    }

    public void addArtifactEvents(EventRequest eventRequest) throws EventsException {
        String tenant = eventRequest.tenant();
        validateOrCreatePartitionAndSeqForTenant(tenant);
        log.info("Adding events for {}", eventRequest.tenant());
        Collection<ArtifactEvent> artifactEvents = eventRequest.artifactEvents();
        if (artifactEvents.isEmpty()) {
            log.warn("Received empty request for tenant {}", tenant);
            return;
        }
        validate(tenant, artifactEvents);
        List copyOf = List.copyOf(artifactEvents);
        try {
            String scopeContextAsString = getScopeContextAsString();
            copyOf.forEach(artifactEvent -> {
                artifactEvent.setTraceContext(scopeContextAsString);
            });
        } catch (JsonProcessingException e) {
            log.warn("Failed during retrieving scope context for events", e);
        }
        ((EventsStorageInitiatorService) this.applicationContext.getBean(EventsStorageInitiatorService.class)).insertEventsTransactional(tenant, copyOf);
        try {
            this.lockService.runWithLock(LockableMethods.SHIFT_EVENTS.getCategory(), LockableMethods.SHIFT_EVENTS.getKey(), this::shiftEvents);
        } catch (LockableMethodException e2) {
            throw new EventsException("Failed during shift events " + e2.getInternalMessage(), e2.getPublicMessage(), e2);
        }
    }

    private void validateOrCreatePartitionAndSeqForTenant(String str) throws EventsException {
        String format = String.format("Failed to get partition for: %s", str);
        try {
            if (Boolean.FALSE.equals(this.tenantsIds.get(str))) {
                log.error(format);
                throw new EventsException(format, format);
            }
        } catch (ExecutionException e) {
            log.error("Caught execution exception for: {}", e.getMessage());
            throw new EventsException(format, e.getMessage());
        }
    }

    @Transactional(rollbackFor = {EventsException.class})
    public void insertEventsTransactional(String str, List<ArtifactEvent> list) throws EventsException {
        try {
            addBlobsIdsToEvents(list, this.messageBlobsDao.insert(str, list.stream().map((v0) -> {
                return v0.getBlob();
            }).toList()));
            this.eventsTmpDao.insert(str, list);
        } catch (SQLException e) {
            throw new EventsException(String.format("%s due to: %s", "Error occurred adding artifacts events to the db", e.getMessage()), "Error occurred adding artifacts events to the db", e);
        } catch (InvalidJdbcResponseException e2) {
            throw new EventsException(e2.getInternalMessage(), e2.getPublicMessage(), e2);
        }
    }

    public void shiftEvents() throws EventsException {
        ShiftEventsDao shiftEventsDao;
        StreamListenerService streamListenerService;
        log.debug("Shifting events...");
        do {
            try {
                shiftEventsDao = this.shiftEventsDao;
                streamListenerService = this.listenerService;
                Objects.requireNonNull(streamListenerService);
            } catch (SQLException e) {
                String format = String.format("%s: %s", "Error occurred during shifting of events", e.getMessage());
                log.debug(format, e);
                throw new EventsException(format, "Error occurred during shifting of events", e);
            } catch (ArtifactoryFederationException e2) {
                String format2 = String.format("Processing of new events after shifting failed: %s", e2.getInternalMessage());
                log.debug(format2, e2);
                throw new EventsException(format2, "Error occurred during shifting of events", e2);
            }
        } while (!shiftEventsDao.shiftAndDeleteTmpEvents(streamListenerService::onReposActivated).isEmpty());
        log.debug("Finished shifting all the events.");
    }

    @Transactional(rollbackFor = {EventsException.class})
    public void createPartition(String str) throws PartitionException {
        try {
            log.info("Creating partition on message and events for {}", str);
            this.messageBlobsDao.createPartitionPreRequirements(str);
            this.eventsDao.createPreRequirements(str);
            log.info("Done creating partitions for {}", str);
        } catch (SQLException e) {
            String format = String.format("Failed to create partition for %s due to: %s", str, e.getMessage());
            throw new PartitionException(format, format, e);
        }
    }

    private void addBlobsIdsToEvents(List<ArtifactEvent> list, List<Long> list2) {
        for (int i = 0; i < list.size(); i++) {
            list.get(i).setBlobId(list2.get(i).longValue());
        }
    }

    @VisibleForTesting
    void validate(String str, Collection<ArtifactEvent> collection) throws EventsException {
        if (StringUtils.isBlank(str)) {
            log.error("Request is missing tenant id");
            throw new EventsException("Request is missing tenant id", "Mandatory field is missing within the request.");
        }
        if (isMandatoryFieldMissing(collection)) {
            log.error("Missing repo key or blob is null for one of the events");
            throw new EventsException("Missing repo key or blob is null for one of the events", "Mandatory field is missing within the request.");
        }
    }

    private static boolean isMandatoryFieldMissing(Collection<ArtifactEvent> collection) {
        return collection.stream().anyMatch(artifactEvent -> {
            return artifactEvent.getBlob() == null || StringUtils.isBlank(artifactEvent.getRepoTenantId().repoKey());
        });
    }

    private void initCache() {
        this.tenantsIds = CacheBuilder.newBuilder().build(CacheLoader.from(this::loadPartition));
    }

    private Boolean loadPartition(String str) {
        try {
            if (isPartitionFoundForTenant(str)) {
                log.info("Partition found for for {}", str);
            } else {
                log.info("Partition not found for {}", str);
                ((EventsStorageInitiatorService) this.applicationContext.getBean(EventsStorageInitiatorService.class)).createPartition(str);
                log.info("Successfully created partition for {}", str);
            }
            return true;
        } catch (PartitionException e) {
            log.error("Failed to create partition for {} due to: {}", str, e.getMessage());
            log.debug("Failed to create partition for {} due to: ", str, e);
            return false;
        } catch (SQLException e2) {
            log.error("Failed to get partitions due to: {}", e2.getMessage());
            log.debug("Failed to get partitions due to: ", e2);
            return false;
        }
    }

    private boolean isPartitionFoundForTenant(String str) throws SQLException {
        return this.eventsDao.getPartitions().stream().anyMatch(str2 -> {
            return str2.startsWith(str);
        }) || this.messageBlobsDao.getPartitions().stream().anyMatch(str3 -> {
            return str3.startsWith(str);
        });
    }

    private String getScopeContextAsString() throws JsonProcessingException {
        HashMap hashMap = new HashMap();
        this.w3CTraceContextPropagator.inject(Context.current(), hashMap, (map, str, str2) -> {
            if (map != null) {
                map.put(str, str2);
            }
        });
        return this.mapper.writeValueAsString(hashMap);
    }

    @Generated
    public LoadingCache<String, Boolean> getTenantsIds() {
        return this.tenantsIds;
    }
}
