package mutatis;

import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import journal.Logger;
import journal.Logger$;
import kafka.common.TopicAndPartition;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicOffsetConsumerConnector;
import kafka.consumer.Whitelist;
import kafka.message.MessageAndMetadata;
import kafka.producer.Partitioner;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.producer.ProducerPool;
import kafka.producer.async.DefaultEventHandler;
import kafka.producer.async.DefaultEventHandler$;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import kafka.utils.Utils$;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.reflect.ManifestFactory$;
import scala.runtime.BoxedUnit;
import scalaz.concurrent.Task;
import scalaz.concurrent.Task$;
import scalaz.stream.Process;
import scalaz.stream.Process$;
import scalaz.stream.sink$;

/* compiled from: mutatis.scala */
/* loaded from: input_file:mutatis/package$.class */
public final class package$ {
    public static final package$ MODULE$ = null;
    private final Logger log;
    private final ScheduledExecutorService mutatis$package$$pool;

    static {
        new package$();
    }

    public Logger log() {
        return this.log;
    }

    public ScheduledExecutorService mutatis$package$$pool() {
        return this.mutatis$package$$pool;
    }

    private ThreadFactory daemonThreads(final String str) {
        return new ThreadFactory(str) { // from class: mutatis.package$$anon$1
            private final String name$1;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread newThread = Executors.defaultThreadFactory().newThread(runnable);
                newThread.setDaemon(true);
                newThread.setName(this.name$1);
                return newThread;
            }

            {
                this.name$1 = str;
            }
        };
    }

    public <K, V> Init<K, V> mutatis$package$$init(ConsumerConfig consumerConfig, String str, Decoder<K> decoder, Decoder<V> decoder2, int i, Duration duration) {
        Properties props = consumerConfig.props().props();
        props.setProperty("auto.commit.enable", "false");
        TopicOffsetConsumerConnector topicOffsetConsumerConnector = new TopicOffsetConsumerConnector(new ConsumerConfig(props));
        Seq createMessageStreamsByFilter = topicOffsetConsumerConnector.consumerConnector().createMessageStreamsByFilter(new Whitelist(str), i, decoder, decoder2);
        ConcurrentHashMap<TopicAndPartition, Object> concurrentHashMap = new ConcurrentHashMap<>();
        package$$anonfun$3 package__anonfun_3 = new package$$anonfun$3(concurrentHashMap);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ((Task) Process$.MODULE$.eval(mutatis$package$$commit(topicOffsetConsumerConnector, concurrentHashMap)).$plus$plus(new package$$anonfun$mutatis$package$$init$1(duration, topicOffsetConsumerConnector, concurrentHashMap)).run(Task$.MODULE$.taskInstance(), Task$.MODULE$.taskInstance())).runAsyncInterruptibly(new package$$anonfun$mutatis$package$$init$2(), atomicBoolean);
        return new Init<>(createMessageStreamsByFilter, package__anonfun_3, new package$$anonfun$1(topicOffsetConsumerConnector), atomicBoolean);
    }

    private <K, V> Duration init$default$6() {
        return new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minutes();
    }

    public <K, V> Process<Task, Process<Task, DecodedEvent<K, V>>> consumer(ConsumerConfig consumerConfig, String str, Decoder<K> decoder, Decoder<V> decoder2, int i, Duration duration) {
        return package$ProcessObjectSyntax$.MODULE$.bracket$extension(ProcessObjectSyntax(Process$.MODULE$), Task$.MODULE$.delay(new package$$anonfun$consumer$1(consumerConfig, str, decoder, decoder2, i, duration)), new package$$anonfun$consumer$2(), new package$$anonfun$consumer$3());
    }

    public <K, V> Duration consumer$default$6() {
        return new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minutes();
    }

    public Task<BoxedUnit> mutatis$package$$commit(TopicOffsetConsumerConnector topicOffsetConsumerConnector, ConcurrentHashMap<TopicAndPartition, Object> concurrentHashMap) {
        return Task$.MODULE$.apply(new package$$anonfun$mutatis$package$$commit$1(topicOffsetConsumerConnector, concurrentHashMap), mutatis$package$$pool()).attempt().map(new package$$anonfun$mutatis$package$$commit$3());
    }

    public <K, V> Process<Task, DecodedEvent<K, V>> mutatis$package$$streamConsumer(Function1<MessageAndMetadata<K, V>, BoxedUnit> function1, Function0<BoxedUnit> function0, KafkaStream<K, V> kafkaStream) {
        return package$ProcessObjectSyntax$.MODULE$.bracket$extension(ProcessObjectSyntax(Process$.MODULE$), Task$.MODULE$.delay(new package$$anonfun$mutatis$package$$streamConsumer$1(kafkaStream)), new package$$anonfun$mutatis$package$$streamConsumer$2(function0), new package$$anonfun$mutatis$package$$streamConsumer$3(function1));
    }

    public <K, V> Process<Task, BoxedUnit> mutatis$package$$commit(Function1<MessageAndMetadata<K, V>, BoxedUnit> function1, MessageAndMetadata<K, V> messageAndMetadata) {
        return Process$.MODULE$.eval(Task$.MODULE$.delay(new package$$anonfun$mutatis$package$$commit$2(function1, messageAndMetadata)));
    }

    public <K, V> Process<Task, DecodedEvent<K, V>> mutatis$package$$syncPoll(Function0<DecodedEvent<K, V>> function0) {
        return Process$.MODULE$.repeatEval(Task$.MODULE$.delay(function0));
    }

    public <V> Process<Task, Function1<V, Task<BoxedUnit>>> producer(ProducerConfig producerConfig, String str, Encoder<V> encoder) {
        Producer producer = producer(producerConfig, (Option) None$.MODULE$, (Encoder) encoder);
        return sink$.MODULE$.lift(new package$$anonfun$producer$1(str, producer)).onComplete(new package$$anonfun$producer$2(producer));
    }

    public <K, V> Process<Task, Function1<Tuple2<K, V>, Task<BoxedUnit>>> producer(ProducerConfig producerConfig, String str, Encoder<K> encoder, Encoder<V> encoder2) {
        Producer<K, V> producer = producer(producerConfig, (Option) new Some(encoder), (Encoder) encoder2);
        return sink$.MODULE$.lift(new package$$anonfun$producer$3(str, producer)).onComplete(new package$$anonfun$producer$4(producer));
    }

    private <K, V> Producer<K, V> producer(ProducerConfig producerConfig, Option<Encoder<K>> option, Encoder<V> encoder) {
        return new Producer<>(producerConfig, new DefaultEventHandler(producerConfig, (Partitioner) Utils$.MODULE$.createObject(producerConfig.partitionerClass(), Predef$.MODULE$.wrapRefArray(new Object[]{producerConfig.props()})), encoder, (Encoder) option.getOrElse(new package$$anonfun$producer$5()), new ProducerPool(producerConfig), DefaultEventHandler$.MODULE$.$lessinit$greater$default$6()));
    }

    public Process$ ProcessObjectSyntax(Process$ process$) {
        return process$;
    }

    private package$() {
        MODULE$ = this;
        this.log = Logger$.MODULE$.apply(ManifestFactory$.MODULE$.singleType(this));
        this.mutatis$package$$pool = Executors.newSingleThreadScheduledExecutor(daemonThreads("mutatis-committer"));
    }
}
