package io.basestar.spark;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.basestar.util.Nullsafe;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition;
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:io/basestar/spark/PartitionedUpsertSink.class */
public class PartitionedUpsertSink extends PartitionedUpsert implements Sink<Map<Map<String, String>, Dataset<Row>>> {
    private static final Logger log = LoggerFactory.getLogger(PartitionedUpsertSink.class);
    private static final String STATE_COLUMN = "__state";
    private static final String CREATE_STATE = "CREATE";
    private static final String UPDATE_STATE = "UPDATE";
    private static final String DELETE_STATE = "DELETE";
    private static final String IGNORE_STATE = "IGNORE";
    private final String databaseName;
    private final String tableName;
    private final List<String> idColumns;
    private final String upsertId;
    private final Format format;
    private final String deletedColumn;

    /* loaded from: input_file:io/basestar/spark/PartitionedUpsertSink$Builder.class */
    public static class Builder {
        private String databaseName;
        private String tableName;
        private List<String> idColumns;
        private String upsertId;
        private Format format;
        private String deletedColumn;

        Builder() {
        }

        public Builder databaseName(String str) {
            this.databaseName = str;
            return this;
        }

        public Builder tableName(String str) {
            this.tableName = str;
            return this;
        }

        public Builder idColumns(List<String> list) {
            this.idColumns = list;
            return this;
        }

        public Builder upsertId(String str) {
            this.upsertId = str;
            return this;
        }

        public Builder format(Format format) {
            this.format = format;
            return this;
        }

        public Builder deletedColumn(String str) {
            this.deletedColumn = str;
            return this;
        }

        public PartitionedUpsertSink build() {
            return new PartitionedUpsertSink(this.databaseName, this.tableName, this.idColumns, this.upsertId, this.format, this.deletedColumn);
        }

        public String toString() {
            return "PartitionedUpsertSink.Builder(databaseName=" + this.databaseName + ", tableName=" + this.tableName + ", idColumns=" + this.idColumns + ", upsertId=" + this.upsertId + ", format=" + this.format + ", deletedColumn=" + this.deletedColumn + ")";
        }
    }

    PartitionedUpsertSink(String str, String str2, List<String> list, String str3, Format format, String str4) {
        this.databaseName = str;
        this.tableName = str2;
        this.idColumns = (List) Nullsafe.option(list, ImmutableList.of("id"));
        this.upsertId = (String) Nullsafe.option(str3, PartitionedUpsert::defaultUpsertId);
        this.format = (Format) Nullsafe.option(format, Format.PARQUET);
        this.deletedColumn = (String) Nullsafe.option(str4, "__deleted");
    }

    private Dataset<Row> clean(Dataset<Row> dataset) {
        Dataset<Row> dataset2 = dataset;
        List asList = Arrays.asList(dataset.schema().fieldNames());
        if (asList.contains(PartitionedUpsert.UPSERT_PARTITION)) {
            dataset2 = dataset2.drop(PartitionedUpsert.UPSERT_PARTITION);
        }
        return asList.contains(this.deletedColumn) ? dataset2.withColumn(STATE_COLUMN, functions.when(dataset2.col(this.deletedColumn), DELETE_STATE).otherwise(UPDATE_STATE)).drop(this.deletedColumn) : dataset2.withColumn(STATE_COLUMN, functions.lit(UPDATE_STATE));
    }

    private static Dataset<Row> prepare(Dataset<Row> dataset, List<String> list, Map<String, String> map, String str) {
        List asList = Arrays.asList(dataset.schema().fieldNames());
        Dataset withColumn = dataset.withColumn(PartitionedUpsert.UPSERT_PARTITION, functions.lit(str));
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (!asList.contains(entry.getKey())) {
                withColumn = withColumn.withColumn(entry.getKey(), functions.lit(entry.getValue()));
            }
        }
        Dataset coalesce = withColumn.coalesce(1);
        Stream concat = Stream.concat(list.stream(), Stream.of(PartitionedUpsert.UPSERT_PARTITION));
        coalesce.getClass();
        return coalesce.repartition((Column[]) concat.map(coalesce::col).toArray(i -> {
            return new Column[i];
        }));
    }

    @Override // io.basestar.spark.Sink
    public void accept(Map<Map<String, String>, Dataset<Row>> map) {
        map.forEach((map2, dataset) -> {
            Dataset createDataFrame;
            boolean z;
            String str;
            SparkSession sparkSession = dataset.sparkSession();
            SQLContext sqlContext = dataset.sqlContext();
            ExternalCatalogWithListener externalCatalog = sparkSession.sharedState().externalCatalog();
            CatalogTable table = externalCatalog.getTable(this.databaseName, this.tableName);
            List asJavaList = ScalaUtils.asJavaList(table.partitionColumnNames());
            String[] strArr = (String[]) Stream.concat(asJavaList.stream(), Stream.of(PartitionedUpsert.UPSERT_PARTITION)).toArray(i -> {
                return new String[i];
            });
            URI location = table.location();
            Map map2 = (Map) map2.entrySet().stream().filter(entry -> {
                return !PartitionedUpsert.UPSERT_PARTITION.equals(entry.getKey());
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            URI create = URI.create(joinPaths(location.toString(), ((String) asJavaList.stream().map(str2 -> {
                return str2 + "=" + ((String) map2.get(str2));
            }).collect(Collectors.joining("/"))) + "/" + PartitionedUpsert.UPSERT_PARTITION + "=" + this.upsertId + "/"));
            Dataset<Row> clean = clean(dataset);
            StructType schema = clean.schema();
            scala.collection.immutable.Map asScalaMap = ScalaUtils.asScalaMap(map2);
            Option partitionOption = externalCatalog.getPartitionOption(this.databaseName, this.tableName, asScalaMap);
            if (partitionOption.isDefined()) {
                URI location2 = ((CatalogTablePartition) partitionOption.get()).location();
                if (location2.equals(create)) {
                    log.info("Partition for {} seems to already have been written as {}", map2, create);
                    return;
                } else {
                    createDataFrame = sqlContext.read().format(Format.forHadoopSerde((String) ((CatalogTablePartition) partitionOption.get()).storage().serde().get()).getSparkFormat()).load(location2.toString()).withColumn(STATE_COLUMN, functions.lit(IGNORE_STATE));
                    z = false;
                    str = extractUpsertId(location2);
                }
            } else {
                createDataFrame = sqlContext.createDataFrame(ImmutableList.of(), schema);
                z = true;
                str = null;
            }
            log.info("Writing partition {} as {}", map2, create);
            Dataset dataset = createDataFrame;
            Dataset cache = clean.joinWith(createDataFrame, (Column) this.idColumns.stream().map(str3 -> {
                return clean.col(str3).equalTo(dataset.col(str3));
            }).reduce((v0, v1) -> {
                return v0.and(v1);
            }).orElseThrow(IllegalStateException::new), "full_outer").map(tuple2 -> {
                return tuple2._1() != null ? tuple2._2() != null ? (Row) tuple2._1() : SparkSchemaUtils.with((Row) tuple2._1(), ImmutableMap.of(STATE_COLUMN, CREATE_STATE)) : SparkSchemaUtils.conform((Row) tuple2._2(), schema);
            }, RowEncoder.apply(clean.schema())).cache();
            Set set = (Set) cache.select(new Column[]{cache.col(STATE_COLUMN)}).distinct().collectAsList().stream().map(row -> {
                return (String) SparkSchemaUtils.get(row, STATE_COLUMN);
            }).collect(Collectors.toSet());
            if (!z && !set.contains(UPDATE_STATE) && !set.contains(DELETE_STATE)) {
                if (set.contains(CREATE_STATE)) {
                    prepare(cache.filter(cache.col(STATE_COLUMN).equalTo(CREATE_STATE)).drop(STATE_COLUMN), asJavaList, map2, str).write().format(this.format.getSparkFormat()).mode(SaveMode.Append).partitionBy(strArr).save(location.toString());
                }
            } else {
                prepare(cache.filter(cache.col(STATE_COLUMN).notEqual(DELETE_STATE)).drop(STATE_COLUMN), asJavaList, map2, this.upsertId).write().format(this.format.getSparkFormat()).mode(SaveMode.Append).partitionBy(strArr).save(location.toString());
                CatalogTablePartition partition = SparkUtils.partition(ScalaUtils.asJavaMap(asScalaMap), this.format, create);
                if (z) {
                    externalCatalog.createPartitions(this.databaseName, this.tableName, Option.apply(partition).toList(), false);
                } else {
                    externalCatalog.alterPartitions(this.databaseName, this.tableName, Option.apply(partition).toList());
                }
            }
        });
    }

    private String joinPaths(String str, String str2) {
        return str.endsWith("/") ? str + str2 : str + "/" + str2;
    }

    public static Builder builder() {
        return new Builder();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 105836726:
                if (implMethodName.equals("lambda$null$30d4776$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/basestar/spark/PartitionedUpsertSink") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/types/StructType;Lscala/Tuple2;)Lorg/apache/spark/sql/Row;")) {
                    StructType structType = (StructType) serializedLambda.getCapturedArg(0);
                    return tuple2 -> {
                        return tuple2._1() != null ? tuple2._2() != null ? (Row) tuple2._1() : SparkSchemaUtils.with((Row) tuple2._1(), ImmutableMap.of(STATE_COLUMN, CREATE_STATE)) : SparkSchemaUtils.conform((Row) tuple2._2(), structType);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
