package com.iheart.thomas.kafka;

import cats.effect.Concurrent;
import cats.effect.ConcurrentEffect;
import cats.effect.ContextShift;
import cats.effect.Resource;
import cats.effect.Timer;
import cats.implicits$;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.iheart.thomas.abtest.model.Abtest;
import com.iheart.thomas.abtest.model.Feature;
import com.iheart.thomas.analysis.Conversions;
import com.iheart.thomas.analysis.KPIDistribution;
import com.iheart.thomas.bandit.bayesian.BayesianMABAlg;
import com.iheart.thomas.bandit.tracking.Event$BanditKPIUpdateStreamStarted$;
import com.iheart.thomas.bandit.tracking.EventLogger;
import com.iheart.thomas.kafka.BanditUpdater;
import com.iheart.thomas.stream.ConversionBanditKPITracker;
import com.iheart.thomas.stream.RestartableStream$;
import fs2.Stream;
import fs2.Stream$;
import fs2.concurrent.SignallingRef;
import fs2.internal.FreeC;
import fs2.kafka.AutoOffsetReset$;
import fs2.kafka.ConsumerSettings;
import fs2.kafka.ConsumerSettings$;
import fs2.kafka.ConsumerStream$;
import fs2.kafka.Deserializer;
import fs2.kafka.Deserializer$;
import fs2.kafka.Deserializer$Record$;
import fs2.kafka.package$;
import lihua.EntityDAO;
import play.api.libs.json.JsObject;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple3;
import scala.concurrent.ExecutionContext;

/* compiled from: BanditUpdater.scala */
/* loaded from: input_file:com/iheart/thomas/kafka/BanditUpdater$.class */
public final class BanditUpdater$ {
    public static BanditUpdater$ MODULE$;

    static {
        new BanditUpdater$();
    }

    public <F, Message> Resource<F, BanditUpdater<F>> resource(BanditUpdater.KafkaConfig kafkaConfig, Function2<String, String, F> function2, Timer<F> timer, ContextShift<F> contextShift, ConcurrentEffect<F> concurrentEffect, Tuple3<EntityDAO<F, Abtest, JsObject>, EntityDAO<F, Feature, JsObject>, EntityDAO<F, KPIDistribution, JsObject>> tuple3, EventLogger<F> eventLogger, ExecutionContext executionContext, AmazonDynamoDBAsync amazonDynamoDBAsync, Deserializer<F, Message> deserializer) {
        return resource(kafkaConfig, timer, contextShift, concurrentEffect, tuple3, MessageProcessor$.MODULE$.apply(function2, Deserializer$Record$.MODULE$.lift(concurrentEffect, deserializer)), eventLogger, executionContext, amazonDynamoDBAsync);
    }

    public <F> Resource<F, BanditUpdater<F>> resource(BanditUpdater.KafkaConfig kafkaConfig, Timer<F> timer, ContextShift<F> contextShift, ConcurrentEffect<F> concurrentEffect, Tuple3<EntityDAO<F, Abtest, JsObject>, EntityDAO<F, Feature, JsObject>, EntityDAO<F, KPIDistribution, JsObject>> tuple3, MessageProcessor<F> messageProcessor, EventLogger<F> eventLogger, ExecutionContext executionContext, AmazonDynamoDBAsync amazonDynamoDBAsync) {
        return ConversionBMABAlgResource$.MODULE$.apply((Timer) timer, (EventLogger) eventLogger, executionContext, (Concurrent) concurrentEffect, (Tuple3) tuple3, amazonDynamoDBAsync).evalMap(bayesianMABAlg -> {
            return MODULE$.create(kafkaConfig, timer, concurrentEffect, contextShift, messageProcessor, bayesianMABAlg, eventLogger);
        }, concurrentEffect);
    }

    public <F> F create(BanditUpdater.KafkaConfig kafkaConfig, Timer<F> timer, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, MessageProcessor<F> messageProcessor, BayesianMABAlg<F, Conversions> bayesianMABAlg, EventLogger<F> eventLogger) {
        return (F) implicits$.MODULE$.toFunctorOps(RestartableStream$.MODULE$.restartable(() -> {
            return new Stream(runningStream$1(concurrentEffect, messageProcessor, kafkaConfig, timer, bayesianMABAlg, eventLogger, contextShift));
        }, concurrentEffect), concurrentEffect).map(tuple2 -> {
            if (tuple2 != null) {
                return new BanditUpdater$$anon$1(bayesianMABAlg, ((Stream) tuple2._1()).fs2$Stream$$free(), (SignallingRef) tuple2._2(), concurrentEffect, eventLogger);
            }
            throw new MatchError(tuple2);
        });
    }

    public static final /* synthetic */ FreeC $anonfun$create$2(ConcurrentEffect concurrentEffect, ConsumerSettings consumerSettings, ContextShift contextShift, Timer timer, BanditUpdater.KafkaConfig kafkaConfig, MessageProcessor messageProcessor, Function1 function1) {
        return Stream$.MODULE$.through$extension(Stream$.MODULE$.through$extension(Stream$.MODULE$.map$extension(Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.evalTap$extension(ConsumerStream$.MODULE$.using$extension(package$.MODULE$.consumerStream(concurrentEffect), consumerSettings, contextShift, timer), kafkaConsumer -> {
            return kafkaConsumer.subscribeTo(kafkaConfig.topic(), Predef$.MODULE$.wrapRefArray(new String[0]));
        }, concurrentEffect), kafkaConsumer2 -> {
            return new Stream(kafkaConsumer2.stream());
        }), committableConsumerRecord -> {
            return committableConsumerRecord.record().value();
        }), messageProcessor.preprocessor()), function1);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final FreeC runningStream$1(ConcurrentEffect concurrentEffect, MessageProcessor messageProcessor, BanditUpdater.KafkaConfig kafkaConfig, Timer timer, BayesianMABAlg bayesianMABAlg, EventLogger eventLogger, ContextShift contextShift) {
        ConsumerSettings withGroupId = ConsumerSettings$.MODULE$.apply(concurrentEffect, Deserializer$Record$.MODULE$.lift(concurrentEffect, Deserializer$.MODULE$.unit(concurrentEffect)), messageProcessor.deserializer()).withEnableAutoCommit(true).withAutoOffsetReset(AutoOffsetReset$.MODULE$.Earliest()).withBootstrapServers(kafkaConfig.kafkaServers()).withGroupId("thomas-kpi-monitor");
        Function1 updateAllConversions = new ConversionBanditKPITracker(timer, bayesianMABAlg, eventLogger, concurrentEffect).updateAllConversions(kafkaConfig.chunkSize(), (str, str2) -> {
            return messageProcessor.toConversionEvent(str, str2);
        });
        return Stream$.MODULE$.$plus$plus$extension(Stream$.MODULE$.eval(eventLogger.apply(Event$BanditKPIUpdateStreamStarted$.MODULE$)), () -> {
            return new Stream($anonfun$create$2(concurrentEffect, withGroupId, contextShift, timer, kafkaConfig, messageProcessor, updateAllConversions));
        });
    }

    private BanditUpdater$() {
        MODULE$ = this;
    }
}
