在python中使用pyspark讀寫Hive數(shù)據(jù)操作
1、讀Hive表數(shù)據(jù)
pyspark讀取hive數(shù)據(jù)非常簡(jiǎn)單,因?yàn)樗袑iT的接口來讀取,完全不需要像hbase那樣,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL語(yǔ)句從hive里面查詢需要的數(shù)據(jù),代碼如下:
from pyspark.sql import HiveContext,SparkSession _SPARK_HOST = 'spark://spark-master:7077'_APP_NAME = 'test'spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate() hive_context= HiveContext(spark_session ) # 生成查詢的SQL語(yǔ)句,這個(gè)跟hive的查詢語(yǔ)句一樣,所以也可以加where等條件語(yǔ)句hive_database = 'database1'hive_table = 'test'hive_read = 'select * from {}.{}'.format(hive_database, hive_table) # 通過SQL語(yǔ)句在hive中查詢的數(shù)據(jù)直接是dataframe的形式read_df = hive_context.sql(hive_read)
2 、將數(shù)據(jù)寫入hive表
pyspark寫hive表有兩種方式:
(1)通過SQL語(yǔ)句生成表
from pyspark.sql import SparkSession, HiveContext _SPARK_HOST = 'spark://spark-master:7077'_APP_NAME = 'test' spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate() data = [ (1,'3','145'), (1,'4','146'), (1,'5','25'), (1,'6','26'), (2,'32','32'), (2,'8','134'), (2,'8','134'), (2,'9','137')]df = spark.createDataFrame(data, [’id’, 'test_id', ’camera_id’]) # method one,default是默認(rèn)數(shù)據(jù)庫(kù)的名字,write_test 是要寫到default中數(shù)據(jù)表的名字df.registerTempTable(’test_hive’)sqlContext.sql('create table default.write_test select * from test_hive')
(2)saveastable的方式
# method two # 'overwrite'是重寫表的模式,如果表存在,就覆蓋掉原始數(shù)據(jù),如果不存在就重新生成一張表# mode('append')是在原有表的基礎(chǔ)上進(jìn)行添加數(shù)據(jù)df.write.format('hive').mode('overwrite').saveAsTable(’default.write_test’)
tips:
spark用上面幾種方式讀寫hive時(shí),需要在提交任務(wù)時(shí)加上相應(yīng)的配置,不然會(huì)報(bào)錯(cuò):
spark-submit --conf spark.sql.catalogImplementation=hive test.py
補(bǔ)充知識(shí):PySpark基于SHC框架讀取HBase數(shù)據(jù)并轉(zhuǎn)成DataFrame
一、首先需要將HBase目錄lib下的jar包以及SHC的jar包復(fù)制到所有節(jié)點(diǎn)的Spark目錄lib下
二、修改spark-defaults.conf 在spark.driver.extraClassPath和spark.executor.extraClassPath把上述jar包所在路徑加進(jìn)去
三、重啟集群
四、代碼
#/usr/bin/python#-*- coding:utf-8 ?*- from pyspark import SparkContextfrom pyspark.sql import SQLContext,HiveContext,SparkSessionfrom pyspark.sql.types import Row,StringType,StructField,StringType,IntegerTypefrom pyspark.sql.dataframe import DataFrame sc = SparkContext(appName='pyspark_hbase')sql_sc = SQLContext(sc) dep = 'org.apache.spark.sql.execution.datasources.hbase'#定義schemacatalog = '''{ 'table':{'namespace':'default', 'name':'teacher'}, 'rowkey':'key', 'columns':{ 'id':{'cf':'rowkey', 'col':'key', 'type':'string'}, 'name':{'cf':'teacherInfo', 'col':'name', 'type':'string'}, 'age':{'cf':'teacherInfo', 'col':'age', 'type':'string'}, 'gender':{'cf':'teacherInfo', 'col':'gender','type':'string'}, 'cat':{'cf':'teacherInfo', 'col':'cat','type':'string'}, 'tag':{'cf':'teacherInfo', 'col':'tag', 'type':'string'}, 'level':{'cf':'teacherInfo', 'col':'level','type':'string'} } }''' df = sql_sc.read.options(catalog = catalog).format(dep).load() print (’***************************************************************’)print (’***************************************************************’)print (’***************************************************************’)df.show()print (’***************************************************************’)print (’***************************************************************’)print (’***************************************************************’)sc.stop()
五、解釋
數(shù)據(jù)來源參考請(qǐng)本人之前的文章,在此不做贅述
schema定義參考如圖:
六、結(jié)果
以上這篇在python中使用pyspark讀寫Hive數(shù)據(jù)操作就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持好吧啦網(wǎng)。
相關(guān)文章:
1. 利用promise及參數(shù)解構(gòu)封裝ajax請(qǐng)求的方法2. JSP數(shù)據(jù)交互實(shí)現(xiàn)過程解析3. windows服務(wù)器使用IIS時(shí)thinkphp搜索中文無效問題4. .NET中l(wèi)ambda表達(dá)式合并問題及解決方法5. Nginx+php配置文件及原理解析6. 淺談python出錯(cuò)時(shí)traceback的解讀7. ASP 信息提示函數(shù)并作返回或者轉(zhuǎn)向8. Ajax實(shí)現(xiàn)表格中信息不刷新頁(yè)面進(jìn)行更新數(shù)據(jù)9. Python importlib動(dòng)態(tài)導(dǎo)入模塊實(shí)現(xiàn)代碼10. python matplotlib:plt.scatter() 大小和顏色參數(shù)詳解
