package io.snappydata.examples;

import com.typesafe.config.Config;
import java.io.File;
import java.io.PrintWriter;
import org.apache.spark.sql.streaming.SchemaDStream;
import org.apache.spark.sql.streaming.SnappyStreamingContext;
import org.apache.spark.sql.streaming.SnappyStreamingJob;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;
import spark.jobserver.SparkJobValid$;
import spark.jobserver.SparkJobValidation;

/* compiled from: TwitterPopularTagsJob.scala */
/* loaded from: input_file:io/snappydata/examples/TwitterPopularTagsJob$.class */
public final class TwitterPopularTagsJob$ implements SnappyStreamingJob {
    public static final TwitterPopularTagsJob$ MODULE$ = null;

    static {
        new TwitterPopularTagsJob$();
    }

    public Object runJob(SnappyStreamingContext snappyStreamingContext, Config config) {
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"TwitterPopularTagsJob-", ".out"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(System.currentTimeMillis())}));
        PrintWriter printWriter = new PrintWriter(s);
        StructType apply = StructType$.MODULE$.apply(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new StructField[]{new StructField("hashtag", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())})));
        snappyStreamingContext.snappyContext().sql("DROP TABLE IF EXISTS topktable");
        snappyStreamingContext.snappyContext().sql("DROP TABLE IF EXISTS hashtagtable");
        snappyStreamingContext.snappyContext().sql("DROP TABLE IF EXISTS retweettable");
        if (config.hasPath("consumerKey") && config.hasPath("consumerKey") && config.hasPath("accessToken") && config.hasPath("accessTokenSecret")) {
            printWriter.println("##### Running example with live twitter stream #####");
            snappyStreamingContext.sql(new StringBuilder().append("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING twitter_stream OPTIONS (").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"consumerKey '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{config.getString("consumerKey")}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"consumerSecret '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{config.getString("consumerSecret")}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"accessToken '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{config.getString("accessToken")}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"accessTokenSecret '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{config.getString("accessTokenSecret")}))).append("rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow')").toString());
            snappyStreamingContext.sql(new StringBuilder().append("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING twitter_stream OPTIONS (").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"consumerKey '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{config.getString("consumerKey")}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"consumerSecret '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{config.getString("consumerSecret")}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"accessToken '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{config.getString("accessToken")}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"accessTokenSecret '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{config.getString("accessTokenSecret")}))).append("rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow')").toString());
        } else {
            printWriter.println("##### Running example with stored tweet data #####");
            snappyStreamingContext.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow',directory '/tmp/copiedtwitterdata')");
            snappyStreamingContext.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow',directory '/tmp/copiedtwitterdata')");
        }
        SchemaDStream registerCQ = snappyStreamingContext.registerCQ("SELECT * FROM retweettable WINDOW (DURATION '2' SECONDS, SLIDE '2' SECONDS)");
        snappyStreamingContext.snappyContext().createApproxTSTopK("topktable", "hashtag", new Some(apply), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("epoch"), BoxesRunTime.boxToLong(System.currentTimeMillis()).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("timeInterval"), "2000ms"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("size"), "10"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("basetable"), "hashtagtable")})), snappyStreamingContext.snappyContext().createApproxTSTopK$default$5());
        snappyStreamingContext.snappyContext().dropTable("retweetStore", true);
        snappyStreamingContext.snappyContext().sql(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"CREATE TABLE ", " (retweetId BIGINT PRIMARY KEY, "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{"retweetStore"}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()"})).s(Nil$.MODULE$)).toString());
        registerCQ.foreachDataFrame(new TwitterPopularTagsJob$$anonfun$runJob$1("retweetStore"));
        snappyStreamingContext.start();
        try {
            long currentTimeMillis = System.currentTimeMillis() + (config.hasPath("streamRunTime") ? new StringOps(Predef$.MODULE$.augmentString(config.getString("streamRunTime"))).toInt() * 1000 : 120000);
            while (currentTimeMillis > System.currentTimeMillis()) {
                Thread.sleep(2000L);
                printWriter.println("\n******** Top 10 hash tags of last two seconds *******\n");
                Predef$.MODULE$.refArrayOps(snappyStreamingContext.snappyContext().queryApproxTSTopK("topktable", System.currentTimeMillis() - 2000, System.currentTimeMillis()).collect()).foreach(new TwitterPopularTagsJob$$anonfun$runJob$2(printWriter));
            }
            printWriter.println("\n************ Top 10 hash tags until now ***************\n");
            Predef$.MODULE$.refArrayOps(snappyStreamingContext.sql("SELECT * FROM topktable").collect()).foreach(new TwitterPopularTagsJob$$anonfun$runJob$3(printWriter));
            printWriter.println("\n####### Top 10 popular tweets - Query Row table #######\n");
            Predef$.MODULE$.refArrayOps(snappyStreamingContext.snappyContext().sql(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"SELECT retweetId AS RetweetId, "})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"retweetCnt AS RetweetsCount, retweetTxt AS Text FROM ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{"retweetStore"}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{" ORDER BY RetweetsCount DESC LIMIT 10"})).s(Nil$.MODULE$)).toString()).collect()).foreach(new TwitterPopularTagsJob$$anonfun$runJob$4(printWriter));
            printWriter.println("\n#######################################################");
            printWriter.close();
            snappyStreamingContext.stop(false, true);
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"See ", "/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{getCurrentDirectory$1(), s}));
        } catch (Throwable th) {
            printWriter.close();
            snappyStreamingContext.stop(false, true);
            throw th;
        }
    }

    public SparkJobValidation validate(SnappyStreamingContext snappyStreamingContext, Config config) {
        return SparkJobValid$.MODULE$;
    }

    private final String getCurrentDirectory$1() {
        return new File(".").getCanonicalPath();
    }

    private TwitterPopularTagsJob$() {
        MODULE$ = this;
    }
}
