package geotrellis.spark.store.cassandra;

import cats.effect.ContextShift;
import cats.effect.IO;
import cats.effect.IO$;
import cats.syntax.EitherOps$;
import cats.syntax.package$either$;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$Compiler$;
import fs2.Stream$PartiallyAppliedFromIterator$;
import fs2.internal.FreeC;
import geotrellis.spark.store.LayerWriter$;
import geotrellis.spark.util.KryoWrapper;
import geotrellis.spark.util.KryoWrapper$;
import geotrellis.store.LayerId;
import geotrellis.store.avro.AvroEncoder$;
import geotrellis.store.avro.AvroRecordCodec;
import geotrellis.store.avro.codecs.KeyValueRecordCodec;
import geotrellis.store.avro.codecs.KeyValueRecordCodec$;
import geotrellis.store.cassandra.CassandraInstance;
import geotrellis.store.cassandra.package$;
import geotrellis.store.util.BlockingThreadPool$;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.spark.rdd.RDD;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Vector;
import scala.concurrent.ExecutionContext;
import scala.math.BigInt;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;

/* compiled from: CassandraRDDWriter.scala */
/* loaded from: input_file:geotrellis/spark/store/cassandra/CassandraRDDWriter$.class */
public final class CassandraRDDWriter$ {
    public static final CassandraRDDWriter$ MODULE$ = new CassandraRDDWriter$();

    public <K, V> void write(RDD<Tuple2<K, V>> rdd, CassandraInstance cassandraInstance, LayerId layerId, Function1<K, BigInt> function1, String str, String str2, Function0<ExecutionContext> function0, AvroRecordCodec<K> avroRecordCodec, AvroRecordCodec<V> avroRecordCodec2) {
        update(rdd, cassandraInstance, layerId, function1, str, str2, None$.MODULE$, None$.MODULE$, function0, avroRecordCodec, avroRecordCodec2);
    }

    public <K, V> ExecutionContext write$default$7() {
        return BlockingThreadPool$.MODULE$.executionContext();
    }

    public <K, V> void update(RDD<Tuple2<K, V>> rdd, CassandraInstance cassandraInstance, LayerId layerId, Function1<K, BigInt> function1, String str, String str2, Option<Schema> option, Option<Function2<V, V, V>> option2, Function0<ExecutionContext> function0, AvroRecordCodec<K> avroRecordCodec, AvroRecordCodec<V> avroRecordCodec2) {
        rdd.sparkContext();
        KeyValueRecordCodec apply = KeyValueRecordCodec$.MODULE$.apply(avroRecordCodec, avroRecordCodec2);
        cassandraInstance.withSessionDo(session -> {
            cassandraInstance.ensureKeyspaceExists(str, session);
            return session.execute(SchemaBuilder.createTable(str, str2).ifNotExists().addPartitionKey("key", DataType.varint()).addClusteringColumn("name", DataType.text()).addClusteringColumn("zoom", DataType.cint()).addColumn("value", DataType.blob()));
        });
        String where = QueryBuilder.select(new String[]{"value"}).from(str, str2).where(QueryBuilder.eq("key", QueryBuilder.bindMarker())).and(QueryBuilder.eq("name", layerId.name())).and(QueryBuilder.eq("zoom", BoxesRunTime.boxToInteger(layerId.zoom()))).toString();
        String insert = QueryBuilder.insertInto(str, str2).value("name", layerId.name()).value("zoom", BoxesRunTime.boxToInteger(layerId.zoom())).value("key", QueryBuilder.bindMarker()).value("value", QueryBuilder.bindMarker()).toString();
        KeyValueRecordCodec apply2 = KeyValueRecordCodec$.MODULE$.apply(avroRecordCodec, avroRecordCodec2);
        KryoWrapper apply3 = KryoWrapper$.MODULE$.apply(option, ClassTag$.MODULE$.apply(Option.class));
        rdd.groupBy(tuple2 -> {
            return (BigInt) function1.apply(tuple2._1());
        }, rdd.partitions().length, ClassTag$.MODULE$.apply(BigInt.class)).foreachPartition(iterator -> {
            $anonfun$update$3(cassandraInstance, where, insert, function0, option2, apply3, apply2, apply, iterator);
            return BoxedUnit.UNIT;
        });
    }

    public <K, V> ExecutionContext update$default$9() {
        return BlockingThreadPool$.MODULE$.executionContext();
    }

    private static final FreeC elaborateRow$1(Tuple2 tuple2, ExecutionContext executionContext, Option option, Session session, PreparedStatement preparedStatement, KryoWrapper kryoWrapper, KeyValueRecordCodec keyValueRecordCodec) {
        return Stream$.MODULE$.eval(IO$.MODULE$.shift(executionContext).$times$greater(IO$.MODULE$.apply(() -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple22 = new Tuple2((BigInt) tuple2._1(), (Vector) tuple2._2());
            BigInt bigInt = (BigInt) tuple22._1();
            return new Tuple2(bigInt, LayerWriter$.MODULE$.updateRecords(option, (Vector) tuple22._2(), () -> {
                ResultSet execute = session.execute(preparedStatement.bind(new Object[]{package$.MODULE$.bigToBig(bigInt)}));
                if (!((IterableOnceOps) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(execute).asScala()).nonEmpty()) {
                    return scala.package$.MODULE$.Vector().empty();
                }
                return (Vector) AvroEncoder$.MODULE$.fromBinary((Schema) ((Option) kryoWrapper.value()).getOrElse(() -> {
                    return keyValueRecordCodec.schema();
                }), execute.one().getBytes("value").array(), keyValueRecordCodec);
            }));
        })));
    }

    private static final FreeC rowToBytes$1(Tuple2 tuple2, ExecutionContext executionContext, KeyValueRecordCodec keyValueRecordCodec) {
        return Stream$.MODULE$.eval(IO$.MODULE$.shift(executionContext).$times$greater(IO$.MODULE$.apply(() -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple22 = new Tuple2((BigInt) tuple2._1(), (Vector) tuple2._2());
            return new Tuple2((BigInt) tuple22._1(), ByteBuffer.wrap(AvroEncoder$.MODULE$.toBinary((Vector) tuple22._2(), keyValueRecordCodec)));
        })));
    }

    private static final FreeC retire$1(Tuple2 tuple2, ExecutionContext executionContext, Session session, PreparedStatement preparedStatement) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((BigInt) tuple2._1(), (ByteBuffer) tuple2._2());
        BigInt bigInt = (BigInt) tuple22._1();
        ByteBuffer byteBuffer = (ByteBuffer) tuple22._2();
        return Stream$.MODULE$.eval(IO$.MODULE$.shift(executionContext).$times$greater(IO$.MODULE$.apply(() -> {
            return session.execute(preparedStatement.bind(new Object[]{package$.MODULE$.bigToBig(bigInt), byteBuffer}));
        })));
    }

    public static final /* synthetic */ FreeC $anonfun$update$11(ExecutionContext executionContext, Option option, Session session, PreparedStatement preparedStatement, KryoWrapper kryoWrapper, KeyValueRecordCodec keyValueRecordCodec, Tuple2 tuple2) {
        return elaborateRow$1(tuple2, executionContext, option, session, preparedStatement, kryoWrapper, keyValueRecordCodec);
    }

    public static final /* synthetic */ FreeC $anonfun$update$12(ExecutionContext executionContext, KeyValueRecordCodec keyValueRecordCodec, Tuple2 tuple2) {
        return rowToBytes$1(tuple2, executionContext, keyValueRecordCodec);
    }

    public static final /* synthetic */ FreeC $anonfun$update$13(ExecutionContext executionContext, Session session, PreparedStatement preparedStatement, Tuple2 tuple2) {
        return retire$1(tuple2, executionContext, session, preparedStatement);
    }

    public static final /* synthetic */ FreeC $anonfun$update$14(ExecutionContext executionContext, Session session) {
        return Stream$.MODULE$.eval(IO$.MODULE$.shift(executionContext).$times$greater(IO$.MODULE$.apply(() -> {
            session.closeAsync();
            return session.getCluster().closeAsync();
        })));
    }

    public static final /* synthetic */ void $anonfun$update$4(String str, String str2, Iterator iterator, Function0 function0, Option option, KryoWrapper kryoWrapper, KeyValueRecordCodec keyValueRecordCodec, KeyValueRecordCodec keyValueRecordCodec2, Session session) {
        PreparedStatement prepare = session.prepare(str);
        PreparedStatement prepare2 = session.prepare(str2);
        FreeC apply$extension = Stream$PartiallyAppliedFromIterator$.MODULE$.apply$extension(Stream$.MODULE$.fromIterator(), iterator.map(tuple2 -> {
            if (tuple2 != null) {
                return new Tuple2((BigInt) tuple2._1(), ((Iterable) tuple2._2()).toVector());
            }
            throw new MatchError(tuple2);
        }), IO$.MODULE$.ioEffect());
        ExecutionContext executionContext = (ExecutionContext) function0.apply();
        ContextShift contextShift = IO$.MODULE$.contextShift(executionContext);
        EitherOps$.MODULE$.valueOr$extension(package$either$.MODULE$.catsSyntaxEither((Either) ((IO) Stream$.MODULE$.compile$extension(Stream$.MODULE$.onComplete$extension(Stream$.MODULE$.parJoinUnbounded$extension(Stream$.MODULE$.map$extension(Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.flatMap$extension(apply$extension, tuple22 -> {
            return new Stream($anonfun$update$11(executionContext, option, session, prepare, kryoWrapper, keyValueRecordCodec, tuple22));
        }), tuple23 -> {
            return new Stream($anonfun$update$12(executionContext, keyValueRecordCodec2, tuple23));
        }), tuple24 -> {
            return new Stream($anonfun$update$13(executionContext, session, prepare2, tuple24));
        }), $less$colon$less$.MODULE$.refl(), $less$colon$less$.MODULE$.refl(), IO$.MODULE$.ioConcurrentEffect(contextShift)), () -> {
            return new Stream($anonfun$update$14(executionContext, session));
        }), Stream$Compiler$.MODULE$.syncInstance(IO$.MODULE$.ioConcurrentEffect(contextShift))).drain()).attempt().unsafeRunSync()), th -> {
            throw th;
        });
    }

    public static final /* synthetic */ void $anonfun$update$3(CassandraInstance cassandraInstance, String str, String str2, Function0 function0, Option option, KryoWrapper kryoWrapper, KeyValueRecordCodec keyValueRecordCodec, KeyValueRecordCodec keyValueRecordCodec2, Iterator iterator) {
        if (iterator.nonEmpty()) {
            cassandraInstance.withSession(session -> {
                $anonfun$update$4(str, str2, iterator, function0, option, kryoWrapper, keyValueRecordCodec, keyValueRecordCodec2, session);
                return BoxedUnit.UNIT;
            });
        }
    }

    private CassandraRDDWriter$() {
    }
}
