package org.eclipse.ditto.services.thingsearch.persistence.write.streaming;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Attributes;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSink;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.base.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.services.thingsearch.common.config.PersistenceStreamConfig;
import org.eclipse.ditto.services.thingsearch.common.config.SearchConfig;
import org.eclipse.ditto.services.thingsearch.common.config.StreamConfig;
import org.eclipse.ditto.services.thingsearch.common.config.StreamStageConfig;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.services.utils.namespaces.BlockedNamespaces;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream.class */
public final class SearchUpdaterStream {
    private final SearchConfig searchConfig;
    private final EnforcementFlow enforcementFlow;
    private final MongoSearchUpdaterFlow mongoSearchUpdaterFlow;
    private final ActorRef changeQueueActor;
    private final BlockedNamespaces blockedNamespaces;

    private SearchUpdaterStream(SearchConfig searchConfig, EnforcementFlow enforcementFlow, MongoSearchUpdaterFlow mongoSearchUpdaterFlow, ActorRef actorRef, BlockedNamespaces blockedNamespaces) {
        this.searchConfig = searchConfig;
        this.enforcementFlow = enforcementFlow;
        this.mongoSearchUpdaterFlow = mongoSearchUpdaterFlow;
        this.changeQueueActor = actorRef;
        this.blockedNamespaces = blockedNamespaces;
    }

    public static SearchUpdaterStream of(SearchConfig searchConfig, ActorSystem actorSystem, ActorRef actorRef, ActorRef actorRef2, ActorRef actorRef3, MongoDatabase mongoDatabase, BlockedNamespaces blockedNamespaces) {
        StreamConfig streamConfig = searchConfig.getStreamConfig();
        return new SearchUpdaterStream(searchConfig, EnforcementFlow.of(streamConfig, actorRef, actorRef2, actorSystem.dispatchers().lookup(streamConfig.getCacheConfig().getDispatcherName()), searchConfig.getDeleteConfig().isDeleteEvent()), MongoSearchUpdaterFlow.of(mongoDatabase), actorRef3, blockedNamespaces);
    }

    public KillSwitch start(ActorRefFactory actorRefFactory) {
        Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource = createRestartSource();
        Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink = createRestartSink();
        return (KillSwitch) createRestartSource.viaMat(KillSwitches.single(), Keep.right()).toMat(createRestartSink, Keep.left()).run(ActorMaterializer.create(actorRefFactory));
    }

    private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource() {
        StreamConfig streamConfig = this.searchConfig.getStreamConfig();
        StreamStageConfig retrievalConfig = streamConfig.getRetrievalConfig();
        Source via = ChangeQueueActor.createSource(this.changeQueueActor, streamConfig.getWriteInterval()).via(filterMapKeysByBlockedNamespaces()).via(this.enforcementFlow.create(retrievalConfig.getParallelism()).map(source -> {
            return source.via(blockNamespaceFlow(SearchUpdaterStream::namespaceOfWriteModel));
        }));
        ExponentialBackOffConfig exponentialBackOffConfig = retrievalConfig.getExponentialBackOffConfig();
        return RestartSource.withBackoff(exponentialBackOffConfig.getMin(), exponentialBackOffConfig.getMax(), exponentialBackOffConfig.getRandomFactor(), () -> {
            return via;
        });
    }

    private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink() {
        StreamConfig streamConfig = this.searchConfig.getStreamConfig();
        PersistenceStreamConfig persistenceConfig = streamConfig.getPersistenceConfig();
        Sink sink = this.mongoSearchUpdaterFlow.start(persistenceConfig.getParallelism(), persistenceConfig.getMaxBulkSize(), streamConfig.getWriteInterval()).map(SearchUpdaterStream::logResult).log("SearchUpdaterStream/BulkWriteResult").withAttributes(Attributes.logLevels(Attributes.logLevelInfo(), Attributes.logLevelWarning(), Attributes.logLevelError())).to(Sink.ignore());
        ExponentialBackOffConfig exponentialBackOffConfig = persistenceConfig.getExponentialBackOffConfig();
        return RestartSink.withBackoff(exponentialBackOffConfig.getMax(), exponentialBackOffConfig.getMax(), exponentialBackOffConfig.getRandomFactor(), () -> {
            return sink;
        });
    }

    private <T> Flow<Map<ThingId, T>, Map<ThingId, T>, NotUsed> filterMapKeysByBlockedNamespaces() {
        return Flow.create().flatMapConcat(map -> {
            Set entrySet = map.entrySet();
            entrySet.getClass();
            return Source.fromIterator(entrySet::iterator).via(blockNamespaceFlow(entry -> {
                return ((ThingId) entry.getKey()).getNamespace();
            })).fold(new HashMap(), (map, entry2) -> {
                map.put(entry2.getKey(), entry2.getValue());
                return map;
            });
        });
    }

    private <T> Flow<T, T, NotUsed> blockNamespaceFlow(Function<T, String> function) {
        return Flow.create().flatMapConcat(obj -> {
            return Source.fromCompletionStage(this.blockedNamespaces.contains((String) function.apply(obj)).handle((bool, th) -> {
                return Boolean.valueOf(bool == null || !bool.booleanValue());
            })).filter((v0) -> {
                return Boolean.valueOf(v0);
            }).map(bool2 -> {
                return obj;
            });
        });
    }

    private static String namespaceOfWriteModel(AbstractWriteModel abstractWriteModel) {
        return abstractWriteModel.getMetadata().getThingId().getNamespace();
    }

    private static String logResult(BulkWriteResult bulkWriteResult) {
        return String.format("BulkWriteResult[matched=%d,upserts=%d,inserted=%d,modified=%d,deleted=%d]", Integer.valueOf(bulkWriteResult.getMatchedCount()), Integer.valueOf(bulkWriteResult.getUpserts().size()), Integer.valueOf(bulkWriteResult.getInsertedCount()), Integer.valueOf(bulkWriteResult.getModifiedCount()), Integer.valueOf(bulkWriteResult.getDeletedCount()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1927519135:
                if (implMethodName.equals("lambda$null$6d23cf57$1")) {
                    z = 7;
                    break;
                }
                break;
            case -623320356:
                if (implMethodName.equals("lambda$null$c6f929ac$1")) {
                    z = 2;
                    break;
                }
                break;
            case -10157301:
                if (implMethodName.equals("lambda$createRestartSource$3ced6112$1")) {
                    z = 8;
                    break;
                }
                break;
            case 110321071:
                if (implMethodName.equals("lambda$createRestartSource$49e4dbfc$1")) {
                    z = 6;
                    break;
                }
                break;
            case 231605032:
                if (implMethodName.equals("valueOf")) {
                    z = 5;
                    break;
                }
                break;
            case 1088561623:
                if (implMethodName.equals("lambda$filterMapKeysByBlockedNamespaces$4105340e$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1182533742:
                if (implMethodName.equals("iterator")) {
                    z = false;
                    break;
                }
                break;
            case 1195666331:
                if (implMethodName.equals("lambda$createRestartSink$1c1eaf9a$1")) {
                    z = 9;
                    break;
                }
                break;
            case 1909838977:
                if (implMethodName.equals("logResult")) {
                    z = true;
                    break;
                }
                break;
            case 1913163660:
                if (implMethodName.equals("lambda$blockNamespaceFlow$6ec87a17$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Set") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Iterator;")) {
                    Set set = (Set) serializedLambda.getCapturedArg(0);
                    return set::iterator;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream") && serializedLambda.getImplMethodSignature().equals("(Lcom/mongodb/bulk/BulkWriteResult;)Ljava/lang/String;")) {
                    return SearchUpdaterStream::logResult;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Ljava/util/Map$Entry;)Ljava/util/Map;")) {
                    return (map, entry2) -> {
                        map.put(entry2.getKey(), entry2.getValue());
                        return map;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/function/Function;Ljava/lang/Object;)Lakka/stream/Graph;")) {
                    SearchUpdaterStream searchUpdaterStream = (SearchUpdaterStream) serializedLambda.getCapturedArg(0);
                    Function function = (Function) serializedLambda.getCapturedArg(1);
                    return obj -> {
                        return Source.fromCompletionStage(this.blockedNamespaces.contains((String) function.apply(obj)).handle((bool, th) -> {
                            return Boolean.valueOf(bool == null || !bool.booleanValue());
                        })).filter((v0) -> {
                            return Boolean.valueOf(v0);
                        }).map(bool2 -> {
                            return obj;
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;)Lakka/stream/Graph;")) {
                    SearchUpdaterStream searchUpdaterStream2 = (SearchUpdaterStream) serializedLambda.getCapturedArg(0);
                    return map2 -> {
                        Set entrySet = map2.entrySet();
                        entrySet.getClass();
                        return Source.fromIterator(entrySet::iterator).via(blockNamespaceFlow(entry -> {
                            return ((ThingId) entry.getKey()).getNamespace();
                        })).fold(new HashMap(), (map2, entry22) -> {
                            map2.put(entry22.getKey(), entry22.getValue());
                            return map2;
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("java/lang/Boolean") && serializedLambda.getImplMethodSignature().equals("(Z)Ljava/lang/Boolean;")) {
                    return (v0) -> {
                        return Boolean.valueOf(v0);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;)Lakka/stream/javadsl/Source;")) {
                    SearchUpdaterStream searchUpdaterStream3 = (SearchUpdaterStream) serializedLambda.getCapturedArg(0);
                    return source -> {
                        return source.via(blockNamespaceFlow(SearchUpdaterStream::namespaceOfWriteModel));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Boolean;)Ljava/lang/Object;")) {
                    Object capturedArg = serializedLambda.getCapturedArg(0);
                    return bool2 -> {
                        return capturedArg;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;)Lakka/stream/javadsl/Source;")) {
                    Source source2 = (Source) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return source2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/SearchUpdaterStream") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Sink;)Lakka/stream/javadsl/Sink;")) {
                    Sink sink = (Sink) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return sink;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
