package geotrellis.spark.io.accumulo;

import geotrellis.spark.util.KryoWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scalaz.concurrent.Strategy$;
import scalaz.concurrent.Task;
import scalaz.concurrent.Task$;
import scalaz.stream.Process$;
import scalaz.stream.channel$;
import scalaz.stream.tee$;

/* compiled from: AccumuloWriteStrategy.scala */
/* loaded from: input_file:geotrellis/spark/io/accumulo/SocketWriteStrategy$$anonfun$write$1.class */
public final class SocketWriteStrategy$$anonfun$write$1 extends AbstractFunction1<Iterator<Tuple2<Key, Value>>, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final AccumuloInstance instance$1;
    private final String table$1;
    private final KryoWrapper serializeWrapper$1;
    private final KryoWrapper kwThreads$1;

    public final void apply(Iterator<Tuple2<Key, Value>> iterator) {
        if (iterator.nonEmpty()) {
            int unboxToInt = BoxesRunTime.unboxToInt(this.kwThreads$1.value());
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(unboxToInt);
            BatchWriter createBatchWriter = this.instance$1.connector().createBatchWriter(this.table$1, (BatchWriterConfig) this.serializeWrapper$1.value());
            try {
                ((Task) scalaz.stream.nondeterminism.package$.MODULE$.njoin(unboxToInt, unboxToInt, Process$.MODULE$.unfold(iterator, new SocketWriteStrategy$$anonfun$write$1$$anonfun$1(this)).tee(channel$.MODULE$.lift(new SocketWriteStrategy$$anonfun$write$1$$anonfun$2(this, newFixedThreadPool, createBatchWriter)), tee$.MODULE$.zipApply()).map(new SocketWriteStrategy$$anonfun$write$1$$anonfun$3(this)), Strategy$.MODULE$.Executor(newFixedThreadPool)).run(Task$.MODULE$.taskInstance(), Task$.MODULE$.taskInstance())).unsafePerformSync();
            } finally {
                createBatchWriter.close();
                newFixedThreadPool.shutdown();
            }
        }
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj) {
        apply((Iterator<Tuple2<Key, Value>>) obj);
        return BoxedUnit.UNIT;
    }

    public SocketWriteStrategy$$anonfun$write$1(SocketWriteStrategy socketWriteStrategy, AccumuloInstance accumuloInstance, String str, KryoWrapper kryoWrapper, KryoWrapper kryoWrapper2) {
        this.instance$1 = accumuloInstance;
        this.table$1 = str;
        this.serializeWrapper$1 = kryoWrapper;
        this.kwThreads$1 = kryoWrapper2;
    }
}
