package org.eclipse.ditto.policies.service.persistence.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.RecoveryCompleted;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
import org.eclipse.ditto.internal.utils.persistentactors.AbstractShardedPersistenceActor;
import org.eclipse.ditto.internal.utils.persistentactors.commands.CommandStrategy;
import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext;
import org.eclipse.ditto.internal.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.model.Label;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyEntry;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.PolicyLifecycle;
import org.eclipse.ditto.policies.model.Subject;
import org.eclipse.ditto.policies.model.SubjectExpiry;
import org.eclipse.ditto.policies.model.SubjectId;
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.policies.model.signals.announcements.SubjectDeletionAnnouncement;
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.policies.model.signals.events.PolicyEvent;
import org.eclipse.ditto.policies.model.signals.events.SubjectDeleted;
import org.eclipse.ditto.policies.service.common.config.DittoPoliciesConfig;
import org.eclipse.ditto.policies.service.common.config.PolicyConfig;
import org.eclipse.ditto.policies.service.persistence.actors.strategies.commands.PolicyCommandStrategies;
import org.eclipse.ditto.policies.service.persistence.actors.strategies.events.PolicyEventStrategies;

/* loaded from: input_file:org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.class */
public final class PolicyPersistenceActor extends AbstractShardedPersistenceActor<Command<?>, Policy, PolicyId, PolicyId, PolicyEvent<?>> {
    public static final String PERSISTENCE_ID_PREFIX = "policy:";
    static final String JOURNAL_PLUGIN_ID = "akka-contrib-mongodb-persistence-policies-journal";
    static final String SNAPSHOT_PLUGIN_ID = "akka-contrib-mongodb-persistence-policies-snapshots";
    private static final Duration ANNOUNCEMENT_WINDOW = Duration.ofMillis(500);
    private static final Collector<Subject, ?, Map<Instant, Set<SubjectId>>> GROUP_BY_EXPIRY_COLLECTOR = getGroupByExpiryCollector();
    private final ActorRef pubSubMediator;
    private final DistributedPub<PolicyAnnouncement<?>> policyAnnouncementPub;
    private final PolicyConfig policyConfig;
    private Instant lastAnnouncement;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor$AnnounceSubjectDeletion.class */
    public static final class AnnounceSubjectDeletion {
        private final Instant cutOff;

        private AnnounceSubjectDeletion(Instant instant) {
            this.cutOff = instant;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor$DeleteOldestExpiredSubject.class */
    private static final class DeleteOldestExpiredSubject {
        private static final DeleteOldestExpiredSubject INSTANCE = new DeleteOldestExpiredSubject();

        private DeleteOldestExpiredSubject() {
        }
    }

    PolicyPersistenceActor(PolicyId policyId, SnapshotAdapter<Policy> snapshotAdapter, ActorRef actorRef, DistributedPub<PolicyAnnouncement<?>> distributedPub) {
        super(policyId, snapshotAdapter);
        this.lastAnnouncement = Instant.EPOCH;
        this.pubSubMediator = actorRef;
        this.policyAnnouncementPub = distributedPub;
        this.policyConfig = DittoPoliciesConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())).getPolicyConfig();
    }

    public static Props props(PolicyId policyId, SnapshotAdapter<Policy> snapshotAdapter, ActorRef actorRef, DistributedPub<PolicyAnnouncement<?>> distributedPub) {
        return Props.create(PolicyPersistenceActor.class, new Object[]{policyId, snapshotAdapter, actorRef, distributedPub});
    }

    public String persistenceId() {
        return "policy:" + this.entityId;
    }

    public String journalPluginId() {
        return JOURNAL_PLUGIN_ID;
    }

    public String snapshotPluginId() {
        return SNAPSHOT_PLUGIN_ID;
    }

    protected Class<?> getEventClass() {
        return PolicyEvent.class;
    }

    protected CommandStrategy.Context<PolicyId> getStrategyContext() {
        return DefaultContext.getInstance(this.entityId, this.log);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getCreatedStrategy, reason: merged with bridge method [inline-methods] */
    public PolicyCommandStrategies m3getCreatedStrategy() {
        return PolicyCommandStrategies.getInstance(this.policyConfig, getContext().getSystem());
    }

    protected CommandStrategy<? extends Command<?>, Policy, PolicyId, PolicyEvent<?>> getDeletedStrategy() {
        return PolicyCommandStrategies.getCreatePolicyStrategy(this.policyConfig);
    }

    protected EventStrategy<PolicyEvent<?>, Policy> getEventStrategy() {
        return PolicyEventStrategies.getInstance();
    }

    protected ActivityCheckConfig getActivityCheckConfig() {
        return this.policyConfig.getActivityCheckConfig();
    }

    protected SnapshotConfig getSnapshotConfig() {
        return this.policyConfig.getSnapshotConfig();
    }

    protected boolean entityExistsAsDeleted() {
        return null != this.entity && ((Policy) this.entity).hasLifecycle(PolicyLifecycle.DELETED);
    }

    protected DittoRuntimeExceptionBuilder<?> newNotAccessibleExceptionBuilder() {
        return PolicyNotAccessibleException.newBuilder(this.entityId);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publishEvent(PolicyEvent<?> policyEvent) {
        this.pubSubMediator.tell(DistPubSubAccess.publishViaGroup("policies.events:", policyEvent), getSender());
        if (Boolean.parseBoolean((String) policyEvent.getDittoHeaders().getOrDefault(DittoHeaderDefinition.POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY.getKey(), Boolean.FALSE.toString()))) {
            return;
        }
        this.pubSubMediator.tell(DistPubSubAccess.publish("policy-invalidate-enforcers", PolicyTag.of(this.entityId, policyEvent.getRevision())), getSender());
    }

    public void onMutation(Command<?> command, PolicyEvent<?> policyEvent, WithDittoHeaders withDittoHeaders, boolean z, boolean z2) {
        Policy policy = (Policy) this.entity;
        persistAndApplyEvent(policyEvent, (policyEvent2, policy2) -> {
            sendPastDueAnnouncementsOfNewSubjects(policy, (Policy) this.entity);
            announceSubjectDeletion(policy, (Policy) this.entity, policyEvent2.getDittoHeaders());
            if (shouldSendResponse(command.getDittoHeaders())) {
                notifySender(getSender(), withDittoHeaders);
            }
            if (z2) {
                becomeDeletedHandler();
            }
            if (z) {
                becomeCreatedHandler();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonSchemaVersion getEntitySchemaVersion(Policy policy) {
        return policy.getImplementedSchemaVersion();
    }

    protected boolean shouldSendResponse(DittoHeaders dittoHeaders) {
        return dittoHeaders.isResponseRequired();
    }

    protected boolean isEntityAlwaysAlive() {
        return findEarliestAnnouncement((Policy) this.entity, this.lastAnnouncement).isPresent();
    }

    protected AbstractActor.Receive matchAnyAfterInitialization() {
        return ReceiveBuilder.create().matchEquals(DeleteOldestExpiredSubject.INSTANCE, deleteOldestExpiredSubject -> {
            handleDeleteExpiredSubjects();
        }).match(AnnounceSubjectDeletion.class, this::sendAnnouncement).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
        }).build();
    }

    protected void recoveryCompleted(RecoveryCompleted recoveryCompleted) {
        if (this.entity != null) {
            sendOrScheduleAnnouncement();
            handleDeleteExpiredSubjects();
        }
        super.recoveryCompleted(recoveryCompleted);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PolicyEvent<?> modifyEventBeforePersist(PolicyEvent<?> policyEvent) {
        PolicyEvent<?> modifyEventBeforePersist = super.modifyEventBeforePersist(policyEvent);
        boolean willEntityBeAlwaysAlive = willEntityBeAlwaysAlive(policyEvent);
        this.alwaysAlive = willEntityBeAlwaysAlive;
        return willEntityBeAlwaysAlive ? modifyEventBeforePersist.setDittoHeaders(modifyEventBeforePersist.getDittoHeaders().toBuilder().journalTags(Set.of("always-alive")).build()) : modifyEventBeforePersist;
    }

    protected void onEntityModified() {
        sendOrScheduleAnnouncement();
        scheduleNextSubjectExpiryCheck();
    }

    private boolean willEntityBeAlwaysAlive(PolicyEvent<?> policyEvent) {
        return streamAndFlatMapSubjects((Policy) getEventStrategy().handle(policyEvent, (Policy) this.entity, getRevisionNumber()), getRelevantAnnouncementInstantFunction(this.lastAnnouncement)).findAny().isPresent();
    }

    private void sendOrScheduleAnnouncement() {
        Optional<Instant> findEarliestAnnouncement = findEarliestAnnouncement((Policy) this.entity, this.lastAnnouncement);
        if (findEarliestAnnouncement.isPresent()) {
            Instant instant = findEarliestAnnouncement.get();
            Instant now = Instant.now();
            if (instant.isBefore(now.plus((TemporalAmount) ANNOUNCEMENT_WINDOW))) {
                sendAnnouncement(new AnnounceSubjectDeletion(instant));
            } else {
                scheduleNextAnnouncement(now, instant);
            }
        }
    }

    private void sendAnnouncement(AnnounceSubjectDeletion announceSubjectDeletion) {
        Instant plus = announceSubjectDeletion.cutOff.plus((TemporalAmount) ANNOUNCEMENT_WINDOW);
        publishExpiryAnnouncementsByTimestamp(streamAndFlatMapSubjects((Policy) this.entity, getSubjectAnnouncementInsideWindow(this.lastAnnouncement, plus)));
        this.lastAnnouncement = plus;
        sendOrScheduleAnnouncement();
    }

    private void publishExpiryAnnouncementsByTimestamp(Stream<Subject> stream) {
        Map map = (Map) stream.collect(GROUP_BY_EXPIRY_COLLECTOR);
        DittoHeaders build = DittoHeaders.newBuilder().randomCorrelationId().build();
        this.log.withCorrelationId(build).info("Sending announcements for <{}>", map);
        map.keySet().stream().sorted().forEach(instant -> {
            this.policyAnnouncementPub.publish(SubjectDeletionAnnouncement.of(this.entityId, instant, (Set) map.get(instant), build), ActorRef.noSender());
        });
    }

    private void scheduleNextAnnouncement(Instant instant, Instant instant2) {
        timers().cancel(AnnounceSubjectDeletion.class);
        Duration between = Duration.between(instant, instant2);
        if (between.isNegative()) {
            this.log.error("scheduleNextAnnouncement called with now=<{}>, cutOff=<{}>: cutOff is not in the future!", instant, instant2);
            return;
        }
        Duration truncateToOneDay = truncateToOneDay(between);
        this.log.info("Scheduling message for sending announcement in: <{}> - earliest announcement is at: <{}>", truncateToOneDay, instant2);
        timers().startSingleTimer(AnnounceSubjectDeletion.class, new AnnounceSubjectDeletion(instant2), truncateToOneDay);
    }

    private void scheduleNextSubjectExpiryCheck() {
        findEarliestSubjectExpiryTimestamp((Policy) this.entity).ifPresent(subjectExpiry -> {
            timers().cancel(DeleteOldestExpiredSubject.class);
            Instant timestamp = subjectExpiry.getTimestamp();
            Duration between = Duration.between(Instant.now(), timestamp);
            if (between.isNegative()) {
                getSelf().tell(DeleteOldestExpiredSubject.INSTANCE, getSelf());
                return;
            }
            Duration truncateToOneDay = truncateToOneDay(between);
            this.log.info("Scheduling message for deleting next expired subject in: <{}> - earliest expiry is at: <{}>", truncateToOneDay, timestamp);
            timers().startSingleTimer(DeleteOldestExpiredSubject.class, DeleteOldestExpiredSubject.INSTANCE, truncateToOneDay);
        });
    }

    private void handleDeleteExpiredSubjects() {
        this.log.debug("Calculating whether subjects did expire and need to be deleted..");
        calculateSubjectDeletedEventOfOldestExpiredSubject((PolicyId) this.entityId, (Iterable) this.entity).ifPresentOrElse(subjectDeleted -> {
            persistAndApplyEvent(subjectDeleted, (policyEvent, policy) -> {
                this.log.withCorrelationId(policyEvent).info("Deleted expired subject <{}> of label <{}>", subjectDeleted.getSubjectId(), subjectDeleted.getLabel());
            });
        }, this::scheduleNextSubjectExpiryCheck);
    }

    private Optional<SubjectDeleted> calculateSubjectDeletedEventOfOldestExpiredSubject(PolicyId policyId, @Nullable Iterable<PolicyEntry> iterable) {
        return determineAlreadyExpiredSubjects(iterable).min(Comparator.comparing(pair -> {
            return (SubjectExpiry) ((Subject) pair.second()).getExpiry().orElseThrow();
        })).map(pair2 -> {
            return SubjectDeleted.of(policyId, (Label) pair2.first(), ((Subject) pair2.second()).getId(), getRevisionNumber() + 1, Instant.now(), DittoHeaders.newBuilder().correlationId(UUID.randomUUID().toString()).build(), (Metadata) null);
        });
    }

    private void announceSubjectDeletion(@Nullable Policy policy, @Nullable Policy policy2, DittoHeaders dittoHeaders) {
        Collector collection = Collectors.toCollection(LinkedHashSet::new);
        Set set = (Set) streamAndFlatMapSubjects(policy, subject -> {
            return subject.getAnnouncement().filter((v0) -> {
                return v0.isWhenDeleted();
            }).map(subjectAnnouncement -> {
                return subject.getId();
            });
        }).collect(collection);
        Set set2 = (Set) streamAndFlatMapSubjects(policy2, subject2 -> {
            return Optional.of(subject2.getId());
        }).collect(Collectors.toSet());
        Set set3 = (Set) set.stream().filter(subjectId -> {
            return !set2.contains(subjectId);
        }).collect(collection);
        if (set3.isEmpty()) {
            return;
        }
        this.policyAnnouncementPub.publish(SubjectDeletionAnnouncement.of(this.entityId, Instant.now(), set3, dittoHeaders), ActorRef.noSender());
    }

    private void sendPastDueAnnouncementsOfNewSubjects(@Nullable Policy policy, @Nullable Policy policy2) {
        if (null != policy) {
            Optional lifecycle = policy.getLifecycle();
            PolicyLifecycle policyLifecycle = PolicyLifecycle.DELETED;
            Objects.requireNonNull(policyLifecycle);
            if (lifecycle.filter((v1) -> {
                return r1.equals(v1);
            }).isPresent()) {
                Set set = (Set) streamAndFlatMapSubjects(policy, (v0) -> {
                    return Optional.of(v0);
                }).collect(Collectors.toSet());
                publishExpiryAnnouncementsByTimestamp(streamAndFlatMapSubjects(policy2, subject -> {
                    return (!getAnnouncementInstant(subject).filter(instant -> {
                        return !this.lastAnnouncement.isBefore(instant);
                    }).isPresent() || set.contains(subject)) ? Optional.empty() : Optional.of(subject);
                }));
            }
        }
    }

    private static Duration truncateToOneDay(Duration duration) {
        Duration ofDays = Duration.ofDays(1L);
        return duration.compareTo(ofDays) < 0 ? duration : ofDays;
    }

    private static Collector<Subject, ?, Map<Instant, Set<SubjectId>>> getGroupByExpiryCollector() {
        return Collectors.filtering(subject -> {
            return subject.getExpiry().isPresent();
        }, Collectors.groupingBy(subject2 -> {
            return ((SubjectExpiry) subject2.getExpiry().orElseThrow()).getTimestamp();
        }, Collectors.mapping((v0) -> {
            return v0.getId();
        }, Collectors.toCollection(LinkedHashSet::new))));
    }

    private static Stream<Pair<Label, Subject>> determineAlreadyExpiredSubjects(@Nullable Iterable<PolicyEntry> iterable) {
        return determineSubjectsWithExpiry(iterable).filter(pair -> {
            return ((Boolean) ((Subject) pair.second()).getExpiry().map((v0) -> {
                return v0.isExpired();
            }).orElse(false)).booleanValue();
        });
    }

    private static Stream<Pair<Label, Subject>> determineSubjectsWithExpiry(@Nullable Iterable<PolicyEntry> iterable) {
        return null == iterable ? Stream.empty() : StreamSupport.stream(iterable.spliterator(), false).flatMap(policyEntry -> {
            return policyEntry.getSubjects().stream().map(subject -> {
                return Pair.create(policyEntry.getLabel(), subject);
            });
        }).filter(pair -> {
            return ((Subject) pair.second()).getExpiry().isPresent();
        });
    }

    private static Optional<SubjectExpiry> findEarliestSubjectExpiryTimestamp(@Nullable Policy policy) {
        return findMinValueSubject(policy, (v0) -> {
            return v0.getExpiry();
        }, Comparator.comparing((v0) -> {
            return v0.getTimestamp();
        }));
    }

    private static Optional<Instant> findEarliestAnnouncement(@Nullable Policy policy, Instant instant) {
        return findMinValueSubject(policy, getRelevantAnnouncementInstantFunction(instant), Comparator.naturalOrder());
    }

    private static <T> Optional<T> findMinValueSubject(@Nullable Policy policy, Function<Subject, Optional<T>> function, Comparator<T> comparator) {
        return streamAndFlatMapSubjects(policy, function).min(comparator);
    }

    private static <T> Stream<T> streamAndFlatMapSubjects(@Nullable Policy policy, Function<Subject, Optional<T>> function) {
        if (null != policy) {
            Optional lifecycle = policy.getLifecycle();
            PolicyLifecycle policyLifecycle = PolicyLifecycle.DELETED;
            Objects.requireNonNull(policyLifecycle);
            if (!lifecycle.filter((v1) -> {
                return r1.equals(v1);
            }).isPresent()) {
                return StreamSupport.stream(policy.spliterator(), false).map((v0) -> {
                    return v0.getSubjects();
                }).flatMap((v0) -> {
                    return v0.stream();
                }).map(function).flatMap((v0) -> {
                    return v0.stream();
                });
            }
        }
        return Stream.empty();
    }

    private static Function<Subject, Optional<Instant>> getRelevantAnnouncementInstantFunction(Instant instant) {
        return subject -> {
            Optional<Instant> announcementInstant = getAnnouncementInstant(subject);
            Objects.requireNonNull(instant);
            return announcementInstant.filter(instant::isBefore);
        };
    }

    private static Function<Subject, Optional<Subject>> getSubjectAnnouncementInsideWindow(Instant instant, Instant instant2) {
        return subject -> {
            return getAnnouncementInstant(subject).filter(instant3 -> {
                return isBetween(instant, instant3, instant2);
            }).map(instant4 -> {
                return subject;
            });
        };
    }

    private static Optional<Instant> getAnnouncementInstant(Subject subject) {
        return subject.getAnnouncement().flatMap((v0) -> {
            return v0.getBeforeExpiry();
        }).flatMap(dittoDuration -> {
            return subject.getExpiry().map(subjectExpiry -> {
                return subjectExpiry.getTimestamp().minus((TemporalAmount) dittoDuration.getDuration());
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isBetween(Instant instant, Instant instant2, Instant instant3) {
        return instant.isBefore(instant2) && !Duration.between(instant2, instant3).isNegative();
    }

    public /* bridge */ /* synthetic */ void onMutation(Command command, Event event, WithDittoHeaders withDittoHeaders, boolean z, boolean z2) {
        onMutation((Command<?>) command, (PolicyEvent<?>) event, withDittoHeaders, z, z2);
    }
}
