案例: 查询某地的人口数量与平均年龄;
分析:使用Spark查询人口的数量和平均年龄,首先我们使用Spark SQL 方式查询,将原始数据读入,将其转化为DATAFRAME,然后是SQL方式计算:
准备实验数据代码如下:
object demo { def main(args: Array[String]): Unit = { val write = new FileWriter("e://data.txt",false) val rand = new Random() for (i <- 1 to 1000){ write.write(i + " " + rand.nextInt(100)) write.write(System.getProperty("line.separator")) } write.flush() write.close() }}
2.创建Spark SQL 查询平均年龄
Counts { main(args: Array[]): Unit = { conf = SparkConf().setAppName().setMaster() sc = SparkContext(conf) sparkSession = SparkSession.().getOrCreate() schema = ( (, IntegerType, ) :: (, IntegerType, ) :: ) people = sc.textFile().map( _.split()).map(p => (p().trim.toInt, p().trim.toInt)) dataFrame = sparkSession.createDataFrame(people, schema) dataFrame.printSchema() dataFrame.createOrReplaceTempView() sparkSession.sql().show() }}
3.创建Spark SQL 查询平均人口总数
Counts { main(args: Array[]): Unit = { conf = SparkConf().setAppName().setMaster() sc = SparkContext(conf) sparkSession = SparkSession.().getOrCreate() schema = ( (, IntegerType, ) :: (, IntegerType, ) :: ) people = sc.textFile().map( _.split()).map(p => (p().trim.toInt, p().trim.toInt)) dataFrame = sparkSession.createDataFrame(people, schema) dataFrame.printSchema() dataFrame.createOrReplaceTempView() sparkSession.sql().show() }}
总结:在读入数据使用Spark SQL 计算是首先需要知道数据的的结构类型,然后创建相应的数据结构。读入数据将其转换为DATAFRAME。在计算之前可以将数据打印一部分出来看看。