如何將Spark SQL模型變?yōu)樵诰€服務(wù),很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
創(chuàng)新互聯(lián)建站是專業(yè)的枝江網(wǎng)站建設(shè)公司,枝江接單;提供成都網(wǎng)站設(shè)計、成都做網(wǎng)站、外貿(mào)網(wǎng)站建設(shè),網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進行枝江網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團隊,希望更多企業(yè)前來合作!
第四范式已經(jīng)在很多行業(yè)落地了上萬個AI應(yīng)用,比如在金融行業(yè)的反欺詐,媒體行業(yè)的新聞推薦,能源行業(yè)管道檢測,而SparkSQL在這些AI應(yīng)用中快速實現(xiàn)特征變換發(fā)揮著重要的作用
SparkSQL在特征變換主要有一下幾類
多表場景,用于表之間拼接操作,比如交易信息表去拼接賬戶表
使用udf進行簡單的特征變換,比如對時間戳進行hour函數(shù)處理
使用時間窗口和udaf進行時序類特征處理,比如計算一個人最近1天的消費金額總和
SparkSQL到目前為止,解決很好的解決離線模型訓(xùn)練特征變換問題,但是隨著AI應(yīng)用的發(fā)展,大家對模型的期望不再只是得出離線調(diào)研效果,而是在真實的業(yè)務(wù)場景發(fā)揮出價值,而真實的業(yè)務(wù)場景是模型應(yīng)用場景,它需要高性能,需要實時推理,這時候我們就會遇到以下問題
多表數(shù)據(jù)離線到在線怎么映射,即批量訓(xùn)練過程中輸入很多表,到在線環(huán)境這些表該以什么形式存在,這點也會影響整個系統(tǒng)架構(gòu),做得好能夠提升效率,做得不好就會大大增加模型產(chǎn)生業(yè)務(wù)價值的成本
SQL轉(zhuǎn)換成實時執(zhí)行成本高,因為在線推理需要高性能,而數(shù)據(jù)科學(xué)家可能做出成千上萬個特征,每個特征都人肉轉(zhuǎn)換,會大大增加的工程成本
離線特征和在線特征保持一致困難,手動轉(zhuǎn)換就會導(dǎo)致一致性能,而且往往很難一致
離線效果很棒但是在線效果無法滿足業(yè)務(wù)需求
在具體的反欺詐場景,模型應(yīng)用要求tp99 20ms去檢測一筆交易是否是欺詐,所以對模型應(yīng)用性能要求非常高
通過特征工程數(shù)據(jù)庫讓SparkSQL的能力得到了補充
以數(shù)據(jù)庫的形式,解決了離線表到在線的映射問題,我們對前面給出的答案就是離線表是怎么分布的,在線也就怎么分布
通過同一套代碼去執(zhí)行離線和在線特征轉(zhuǎn)換,讓在線模型效果得到了保證
數(shù)據(jù)科學(xué)家與業(yè)務(wù)開發(fā)團隊的合作以sql為傳遞介質(zhì),而不再是手工去轉(zhuǎn)換代碼,大大提升模型迭代效率
通過llvm加速的sql,相比scala實現(xiàn)的spark2.x和3.x在時序復(fù)雜特征場景能夠加速2~3倍,在線通過in-memory的存儲,能夠保證sql能夠在非常低延遲返回結(jié)果
demo的模型訓(xùn)練場景為預(yù)測一次打車行程到結(jié)束所需要的時間,這里我們將使用fedb ,pyspark,lightgbm等工具最終搭建一個http 模型推理服務(wù),這也會是spark在機器學(xué)習(xí)場景的實踐
整個demo200多行代碼,制作時間不超過半個小時
train_sql.py 特征計算與訓(xùn)練, 80行代碼
predict_server.py 模型推理http服務(wù), 129行代碼
整個訓(xùn)練數(shù)據(jù)如下樣子
樣例數(shù)據(jù)
id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration id3097625,1,2016-01-22 16:01:00,2016-01-22 16:15:16,2,-73.97746276855469,40.7613525390625,-73.95573425292969,40.772396087646484,N,856 id3196697,1,2016-01-28 07:20:18,2016-01-28 07:40:16,1,-73.98524475097656,40.75959777832031,-73.99615478515625,40.72945785522461,N,1198 id0224515,2,2016-01-31 00:48:27,2016-01-31 00:53:30,1,-73.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303 id3370903,1,2016-01-14 11:46:43,2016-01-14 12:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330 id2763851,2,2016-02-20 13:21:00,2016-02-20 13:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496 id0904926,1,2016-02-20 19:17:44,2016-02-20 19:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935 id2026293,1,2016-02-25 01:16:23,2016-02-25 01:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904 id1349988,1,2016-01-28 20:16:05,2016-01-28 20:21:36,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.743770599365234,N,331 id3218692,2,2016-02-17 16:43:27,2016-02-17 16:54:41,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674 `
場景特征變換sql腳本
特征變換
select trip_duration, passenger_count, sum `(pickup_latitude) over w as vendor_sum_pl,` max `(pickup_latitude) over w as vendor_max_pl,` min `(pickup_latitude) over w as vendor_min_pl,` avg `(pickup_latitude) over w as vendor_avg_pl,` sum `(pickup_latitude) over w2 as pc_sum_pl,` max `(pickup_latitude) over w2 as pc_max_pl,` min `(pickup_latitude) over w2 as pc_min_pl,` avg `(pickup_latitude) over w2 as pc_avg_pl ,` count `(vendor_id) over w2 as pc_cnt,` count `(vendor_id) over w as vendor_cnt` from {} window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW), w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW) `
我們選擇了vendor_id 和 passenger_count 兩個緯度做時序特征
train_df = spark.sql(train_sql) # specify your configurations as a dict params = { 'boosting_type' `: 'gbdt' , 'objective' `: 'regression' , 'metric' `: { 'l2' , 'l1' }, 'num_leaves' `: 31 , 'learning_rate' `: 0.05 , 'feature_fraction' `: 0.9 , 'bagging_fraction' `: 0.8 , 'bagging_freq' `: 5 , 'verbose' `: 0` } print `( 'Starting training...' )` gbm = lgb.train(params, lgb_train, num_boost_round `= 20 ,` valid_sets `= lgb_eval, early_stopping_rounds `= 5 )` gbm.save_model( `'model.txt' )執(zhí)行模型訓(xùn)練過程,最終產(chǎn)生model.txt
導(dǎo)入數(shù)據(jù)代碼
import def insert_row(line): row = line.split( `',' ) row[ `2 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 2 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )` row[ `3 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 3 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )` insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);" `% tuple (row) driver.executeInsert( `'db_test' , insert) with open `( 'data/taxi_tour_table_train_simple.csv' , 'r' ) as fd: idx = 0 for line in fd: if idx = `= 0 : idx = idx + 1 continue insert_row(line.replace( `'n' , '')) idx = idx + 1 ` 注:train.csv為訓(xùn)練數(shù)據(jù)csv格式版本
模型推理邏輯
predict.py def` `post( self ): row = json.loads( `self .request.body) ok, req = fedb_driver.getRequestBuilder( `'db_test' , sql) if not ok or not req: self `.write( "fail to get req" )` return input_schema = req.GetSchema() if not input_schema: self `.write( "no schema found" )` return str_length = 0 for i in range `(input_schema.GetColumnCnt()):` if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) = `= 'string' : str_length = str_length + len `(row.get(input_schema.GetColumnName(i), ''))` req.Init(str_length) for i in range `(input_schema.GetColumnCnt()):` tname = sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) if tname = `= 'string' : req.AppendString(row.get(input_schema.GetColumnName(i), '')) elif tname = `= 'int32' : req.AppendInt32( `int (row.get(input_schema.GetColumnName(i),` `0 )))` elif tname = `= 'double' : req.AppendDouble( `float (row.get(input_schema.GetColumnName(i),` `0 )))` elif tname = `= 'timestamp' : req.AppendTimestamp( `int (row.get(input_schema.GetColumnName(i),` `0 )))` else `:` req.AppendNULL() if not req.Build(): self `.write( "fail to build request" )` return ok, rs = fedb_driver.executeQuery( `'db_test' , sql, req) if not ok: self `.write( "fail to execute sql" )` return rs. `Next () ins = build_feature(rs) self `.write( "----------------ins---------------\n" )` self `.write( str (ins) + "n" ) duration = bst.predict(ins) self `.write( "---------------predict trip_duration -------------\n" )` self `.write( "%s s" % str (duration[ 0 ]))``
python3 predict.py ----------------ins--------------- [[ 2. 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097 1. 1. ]] ---------------predict trip_duration ------------- 859.3298781277192 s `
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。
新聞標題:如何將SparkSQL模型變?yōu)樵诰€服務(wù)
鏈接URL:http://jinyejixie.com/article20/gdjcjo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、微信小程序、營銷型網(wǎng)站建設(shè)、網(wǎng)站內(nèi)鏈、面包屑導(dǎo)航、電子商務(wù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)