使用pyspark怎么對Mysql數(shù)據(jù)庫進行讀寫操作?針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
成都創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設,鷹手營子企業(yè)網(wǎng)站建設,鷹手營子品牌網(wǎng)站建設,網(wǎng)站定制,鷹手營子網(wǎng)站建設報價,網(wǎng)絡營銷,網(wǎng)絡優(yōu)化,鷹手營子網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學習、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。pyspark是Spark對Python的api接口,可以在Python環(huán)境中通過調用pyspark模塊來操作spark,完成大數(shù)據(jù)框架下的數(shù)據(jù)分析與挖掘。其中,數(shù)據(jù)的讀寫是基礎操作,pyspark的子模塊pyspark.sql 可以完成大部分類型的數(shù)據(jù)讀寫。文本介紹在pyspark中讀寫Mysql數(shù)據(jù)庫。
在Python中使用Spark,需要安裝配置Spark,這里跳過配置的過程,給出運行環(huán)境和相關程序版本信息。
win10 64bit
java 13.0.1
spark 3.0
python 3.8
pyspark 3.0
pycharm 2019.3.4
pyspark連接Mysql是通過java實現(xiàn)的,所以需要下載連接Mysql的jar包。
下載地址
選擇下載Connector/J
,然后選擇操作系統(tǒng)為Platform Independent
,下載壓縮包到本地。
然后解壓文件,將其中的jar包mysql-connector-java-8.0.19.jar
放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars
。
環(huán)境配置完成!
腳本如下:
from pyspark.sql import SQLContext, SparkSession if __name__ == '__main__': # spark 初始化 spark = SparkSession. \ Builder(). \ appName('sql'). \ master('local'). \ getOrCreate() # mysql 配置(需要修改) prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://host:port/database' # 讀取表 data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop) # 打印data數(shù)據(jù)類型 print(type(data)) # 展示數(shù)據(jù) data.show() # 關閉spark會話 spark.stop()
注意點:
prop
參數(shù)需要根據(jù)實際情況修改,文中用戶名和密碼用xxx代替了,driver
參數(shù)也可以不需要;
url
參數(shù)需要根據(jù)實際情況修改,格式為jdbc:mysql://主機:端口/數(shù)據(jù)庫
;
通過調用方法read.jdbc
進行讀取,返回的數(shù)據(jù)類型為spark DataFrame;
運行腳本,輸出如下:
腳本如下:
import pandas as pd from pyspark import SparkContext from pyspark.sql import SQLContext, Row if __name__ == '__main__': # spark 初始化 sc = SparkContext(master='local', appName='sql') spark = SQLContext(sc) # mysql 配置(需要修改) prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://host:port/database' # 創(chuàng)建spark DataFrame # 方式1:list轉spark DataFrame l = [(1, 12), (2, 22)] # 創(chuàng)建并指定列名 list_df = spark.createDataFrame(l, schema=['id', 'value']) # 方式2:rdd轉spark DataFrame rdd = sc.parallelize(l) # rdd col_names = Row('id', 'value') # 列名 tmp = rdd.map(lambda x: col_names(*x)) # 設置列名 rdd_df = spark.createDataFrame(tmp) # 方式3:pandas dataFrame 轉spark DataFrame df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]}) pd_df = spark.createDataFrame(df) # 寫入數(shù)據(jù)庫 pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop) # 關閉spark會話 sc.stop()
注意點:
prop
和url
參數(shù)同樣需要根據(jù)實際情況修改;
寫入數(shù)據(jù)庫要求的對象類型是spark DataFrame,提供了三種常見數(shù)據(jù)類型轉spark DataFrame的方法;
通過調用write.jdbc
方法進行寫入,其中的model
參數(shù)控制寫入數(shù)據(jù)的行為。
model | 參數(shù)解釋 |
---|---|
error | 默認值,原表存在則報錯 |
ignore | 原表存在,不報錯且不寫入數(shù)據(jù) |
append | 新數(shù)據(jù)在原表行末追加 |
overwrite | 覆蓋原表 |
Access denied for user …
原因:mysql配置參數(shù)出錯
解決辦法:檢查user,password拼寫,檢查賬號密碼是否正確,用其他工具測試mysql是否能正常連接,做對比檢查。
No suitable driver
原因:沒有配置運行環(huán)境
解決辦法:下載jar包進行配置,具體過程參考本文的2 環(huán)境配置。
關于使用pyspark怎么對Mysql數(shù)據(jù)庫進行讀寫操作問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關知識。
本文題目:使用pyspark怎么對Mysql數(shù)據(jù)庫進行讀寫操作-創(chuàng)新互聯(lián)
網(wǎng)站路徑:http://jinyejixie.com/article22/dcpjcc.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、品牌網(wǎng)站建設、虛擬主機、用戶體驗、外貿網(wǎng)站建設、網(wǎng)站設計公司
聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)