在这篇文章中,我们将一起分析通过Spark访问Hive的数据,主要分享以下几点内容:
1.如何通过Spark Shell交互式访问Spark
2.如何读取HDFS文件和创建一个RDD
3.如何通过Spark API交互式地分析数据集
4.如何创建Hive的ORC格式的表
5.如何使用Spark SQL查询Hive表
6.如何以ORC格式存储数据
Spark SQL使用Spark引擎对存储在HDFS或者存在的RDDs执行SQL查询。我们可以在Spark程序中使用SQL语句来操作数据。
1.获取数据集
在Linux服务器终端中获取样例数据:
wget http://hortonassets.s3.amazonaws.com/tutorial/data/yahoo_stocks.csv
将下载的数据上传到HDFS的目录中,如下:
hdfs dfs -put ./yahoo_stocks.csv /tmp/
2.启动Spark Shell
spark-shell
这里启动了spark-shell,并且能够和Hive进行交互,因为我们已经将hive-site.xml,hdfs-site.xml和core-site.xml拷贝到spark的conf目录下面了。
导入需要的库文件:
scala> import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.hive.orc._
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
3.创建SparkSession
在Spark 2.0中提供了SparkSession,内置支持Hive特性,包括使用HiveQL,访问Hive UDFs,并且可以从Hive表中获取数据。
创建实例:
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
我们使用spark-shell登录时,默认已经为我们创建了一个SparkSession的实例为spark,后面可以直接使用该实例。
Spark session available as 'spark'.
4.创建ORC格式的表
在Hive中创建表:
scala> spark.sql("create table yahoo_orc_table (date STRING,open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
res0: org.apache.spark.sql.DataFrame = []
5.加载数据文件并创建一个RDD
scala> val yahoo_stocks =sc.textFile("hdfs://SZB-L0023776:8020/tmp/yahoo_stocks.csv")
yahoo_stocks: org.apache.spark.rdd.RDD[String] =
hdfs://SZB-L0023776:8020/tmp/yahoo_stocks.csv MapPartitionsRDD[2] at textFile at <console>:30
获取10行数据:
scala> yahoo_stocks.take(10).foreach(println)
Date,Open,High,Low,Close,Volume,AdjClose
2016-09-20,44.34,44.57,43.94,44.34,7188300,44.34
2016-09-20,44.65,45.10,44.25,44.36,10840900,44.36
2016-09-20,43.73,44.71,43.69,44.52,11267500,44.52
2016-09-20,43.92,44.06,43.58,43.70,14274900,43.70
2016-09-20,44.58,44.85,43.67,43.98,32241200,43.98
2016-09-20,45.15,45.18,44.45,44.49,16103700,44.49
2016-09-20,44.73,44.91,44.41,44.66,10052900,44.66
2016-09-20,45.30,45.44,44.25,44.45,13305700,44.45
2016-09-20,45.82,46.13,45.53,45.78,13800300,45.78
6.数据的首行为字段名称
scala> val header = yahoo_stocks.first
header: String = Date,Open,High,Low,Close,Volume,Adj Close
下面我们创建一个新的RDD,不包括首行字段名称:
scala> val data = yahoo_stocks.mapPartitionsWithIndex { (idx, iter)=> if (idx == 0) iter.drop(1) else iter }
data: org.apache.spark.rdd.RDD[String] =MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:32
7.创建一个Schema
scala> case class YahooStockPrice(date: String, open: Float, high:Float, low: Float, close: Float, volume: Integer, adjClose: Float)
defined class YahooStockPrice
8.将Schema绑定到处理后的数据上
针对YahooStockPrice创建一个RDD,并注册为一张表:
scala> val stockprice = data.map(_.split(",")).map(row=> YahooStockPrice(row(0), row(1).trim.toFloat, row(2).trim.toFloat,row(3).trim.toFloat, row(4).trim.toFloat, row(5).trim.toInt,row(6).trim.toFloat)).toDF()
stockprice: org.apache.spark.sql.DataFrame = [date: string,open: float ... 5 more fields]
查看数据:
scala> stockprice.first
res4: org.apache.spark.sql.Row =[2016-09-20,44.34,44.57,43.94,44.34,7188300,44.34]
查看更多的数据:
scala> stockprice.show
验证Schema:
scala> stockprice.printSchema
root
|-- date: string (nullable = true)
|-- open: float (nullable = false)
|-- high: float (nullable = false)
|-- low: float (nullable = false)
|-- close: float (nullable = false)
|-- volume: integer (nullable = true)
|-- adjClose: float (nullable = false)
9.注册一个临时表
scala> stockprice.createOrReplaceTempView("yahoo_stocks_temp")
10.查询创建的临时表
注意这里的表不是Hive里面的表,而是一个RDD:
scala> val results = spark.sql("SELECT * FROM yahoo_stocks_temp")
scala> results.map(t => "Stock Entry: " +t.toString).collect().foreach(println)
……
Stock Entry:[2016-09-20,32.50008,32.50008,29.37504,30.12504,8214400,1.25521]
Stock Entry: [2016-09-20,32.25,32.50008,31.24992,31.99992,6116800,1.33333]
Stock Entry:[2016-09-20,31.5,33.25008,31.5,32.87496,9731200,1.36979]
Stock Entry:[2016-09-20,30.25008,31.75008,30.0,31.62504,4881600,1.31771]
Stock Entry: [2016-09-20,31.24992,31.5,29.50008,29.74992,5003200,1.23958]
……
11.作为ORC文件格式保存
我们将上面的数据写入到Hive表里面,并且存储的文件格式为ORC。
scala> results.write.format("orc").saveAsTable("yahoo_stocks_orc")
12.读取ORC文件
scala> val yahoo_stocks_orc= spark.read.format("orc").load("yahoo_stocks_orc")
yahoo_stocks_orc: org.apache.spark.sql.DataFrame = [date:string, open: float ... 5 more fields]
注册一个临时基于内存的表并映射到此ORC表:
scala> yahoo_stocks_orc.createOrReplaceTempView("orcTest")
查询:
scala> spark.sql("SELECT * from orcTest").collect.foreach(println)
……
[2016-09-20,31.5,31.99992,30.49992,31.00008,5928000,1.29167]
[2016-09-20,31.99992,32.25,31.24992,31.75008,7561600,1.32292]
[2016-09-20,30.0,32.25,28.99992,31.24992,19478400,1.30208]
[2016-09-20,28.5,29.12496,27.75,28.99992,7795200,1.20833]
……
13.查询Hive的表数据
我们在使用spark-shell登录时,默认初始化了一个spark实例:
Spark session available as 'spark'.
我们可以使用spark访问Hive的表数据。
scala> val tableDF =spark.sql("select * from yahoo_stocks_orc limit 10")
tableDF: org.apache.spark.sql.DataFrame = [date: string,open: float ... 5 more fields]
查看10行数据:
scala> tableDF.take(10).foreach(println)
[2016-09-20,44.34,44.57,43.94,44.34,7188300,44.34]
[2016-09-20,44.65,45.1,44.25,44.36,10840900,44.36]
[2016-09-20,43.73,44.71,43.69,44.52,11267500,44.52]
[2016-09-20,43.92,44.06,43.58,43.7,14274900,43.7]
[2016-09-20,44.58,44.85,43.67,43.98,32241200,43.98]
[2016-09-20,45.15,45.18,44.45,44.49,16103700,44.49]
[2016-09-20,44.73,44.91,44.41,44.66,10052900,44.66]
[2016-09-20,45.3,45.44,44.25,44.45,13305700,44.45]
[2016-09-20,45.82,46.13,45.53,45.78,13800300,45.78]
[2016-09-20,45.46,45.83,45.23,45.73,15033500,45.73]