成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Python的分布式函數(shù)的簡(jiǎn)單介紹

Python高性能分布式執(zhí)行框架-Ray

這是別人說(shuō)的,咱也不敢說(shuō),咱也不敢問 ! 了解大致的邏輯就好.你只需要知道他超級(jí)牛逼,超級(jí)方便

創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括北林網(wǎng)站建設(shè)、北林網(wǎng)站制作、北林網(wǎng)頁(yè)制作以及北林網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,北林網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到北林省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

安裝就是簡(jiǎn)單的 pip install ray , 需要提醒的就是ray現(xiàn)在只有l(wèi)inux編譯版本, windows就別想著用了,為了這,我硬生生把開發(fā)環(huán)境從windows切到了linux.

首先來(lái)看一下最簡(jiǎn)單的Ray程序是如何編寫的。

在Ray里,通過(guò)Python注解@ray.remote定義remote函數(shù)。使用此注解聲明的函數(shù)都會(huì)自帶一個(gè)默認(rèn)的方法remote,通過(guò)此方法發(fā)起的函數(shù)調(diào)用都是以提交分布式任務(wù)的方式異步執(zhí)行的,函數(shù)的返回值是一個(gè)對(duì)象id,使用ray.get內(nèi)置操作可以同步獲取該id對(duì)應(yīng)的對(duì)象

參考:

高性能分布式執(zhí)行框架——Ray

取代 Python 多進(jìn)程!伯克利開源分布式框架 Ray

官方文檔

基于python的高性能實(shí)時(shí)并行機(jī)器學(xué)習(xí)框架之Ray介紹

Python的函數(shù)都有哪些

【常見的內(nèi)置函數(shù)】

1、enumerate(iterable,start=0)

是python的內(nèi)置函數(shù),是枚舉、列舉的意思,對(duì)于一個(gè)可迭代的(iterable)/可遍歷的對(duì)象(如列表、字符串),enumerate將其組成一個(gè)索引序列,利用它可以同時(shí)獲得索引和值。

2、zip(*iterables,strict=False)

用于將可迭代的對(duì)象作為參數(shù),將對(duì)象中對(duì)應(yīng)的元素打包成一個(gè)個(gè)元組,然后返回由這些元組組成的列表。如果各個(gè)迭代器的元素個(gè)數(shù)不一致,則返回列表長(zhǎng)度與最短的對(duì)象相同,利用*號(hào)操作符,可以將元組解壓為列表。

3、filter(function,iterable)

filter是將一個(gè)序列進(jìn)行過(guò)濾,返回迭代器的對(duì)象,去除不滿足條件的序列。

4、isinstance(object,classinfo)

是用來(lái)判斷某一個(gè)變量或者是對(duì)象是不是屬于某種類型的一個(gè)函數(shù),如果參數(shù)object是classinfo的實(shí)例,或者object是classinfo類的子類的一個(gè)實(shí)例,

返回True。如果object不是一個(gè)給定類型的的對(duì)象, 則返回結(jié)果總是False

5、eval(expression[,globals[,locals]])

用來(lái)將字符串str當(dāng)成有效的表達(dá)式來(lái)求值并返回計(jì)算結(jié)果,表達(dá)式解析參數(shù)expression并作為Python表達(dá)式進(jìn)行求值(從技術(shù)上說(shuō)是一個(gè)條件列表),采用globals和locals字典作為全局和局部命名空間。

【常用的句式】

1、format字符串格式化

format把字符串當(dāng)成一個(gè)模板,通過(guò)傳入的參數(shù)進(jìn)行格式化,非常實(shí)用且強(qiáng)大。

2、連接字符串

常使用+連接兩個(gè)字符串。

3、if...else條件語(yǔ)句

Python條件語(yǔ)句是通過(guò)一條或多條語(yǔ)句的執(zhí)行結(jié)果(True或者False)來(lái)決定執(zhí)行的代碼塊。其中if...else語(yǔ)句用來(lái)執(zhí)行需要判斷的情形。

4、for...in、while循環(huán)語(yǔ)句

循環(huán)語(yǔ)句就是遍歷一個(gè)序列,循環(huán)去執(zhí)行某個(gè)操作,Python中的循環(huán)語(yǔ)句有for和while。

5、import導(dǎo)入其他腳本的功能

有時(shí)需要使用另一個(gè)python文件中的腳本,這其實(shí)很簡(jiǎn)單,就像使用import關(guān)鍵字導(dǎo)入任何模塊一樣。

[Django] celery的替代品 funboost

Django開發(fā)web應(yīng)用的過(guò)程中,一個(gè)老大難問題是異步調(diào)度問題。例如用戶傳來(lái)一個(gè)非常耗時(shí)的請(qǐng)求,這時(shí)候最好的處理方式是先把這個(gè)操作請(qǐng)求記錄下來(lái),先響應(yīng)請(qǐng)求,等后面有空的時(shí)候再去計(jì)算,而不是讓用戶干等著著急。

這種優(yōu)化方式就是典型的生產(chǎn)者+消息隊(duì)列+消費(fèi)者設(shè)計(jì)模式,而Django框架本身并沒有直接提供該設(shè)計(jì)模式的實(shí)現(xiàn),大多教程都是利用第三方組件celery+redis來(lái)實(shí)現(xiàn)這個(gè)調(diào)度。

遺憾的是celery和redis官方都不支持windows,而我習(xí)慣的開發(fā)環(huán)境還是win10,所以需要找一個(gè)替代品。經(jīng)過(guò)調(diào)研,發(fā)現(xiàn)了一個(gè)很好的【python分布式函數(shù)調(diào)度框架——funboost】. 它的優(yōu)點(diǎn)很多,對(duì)Django開發(fā)來(lái)說(shuō),最大的亮點(diǎn)是完全無(wú)需啟動(dòng)第三方服務(wù),即可實(shí)現(xiàn)生產(chǎn)消費(fèi)設(shè)計(jì)模式。一個(gè) pip install funboost 即可干活,開箱即用。它可以使用SQLite文件來(lái)做消息隊(duì)列,足以應(yīng)對(duì)小型應(yīng)用開發(fā)。當(dāng)然也可以使用Kafka這種高級(jí)的消息中間件,實(shí)現(xiàn)高可用。

要說(shuō)缺點(diǎn)吧,這個(gè)組件的日志打印太啰嗦,而且沒有提供關(guān)閉選項(xiàng),控制臺(tái)已被它刷屏。

如何用 Python 構(gòu)建一個(gè)簡(jiǎn)單的分布式系統(tǒng)

分布式爬蟲概覽

何謂分布式爬蟲?

通俗的講,分布式爬蟲就是多臺(tái)機(jī)器多個(gè)

spider

對(duì)多個(gè)

url

的同時(shí)處理問題,分布式的方式可以極大提高程序的抓取效率。

構(gòu)建分布式爬蟲通暢需要考慮的問題

(1)如何能保證多臺(tái)機(jī)器同時(shí)抓取同一個(gè)URL?

(2)如果某個(gè)節(jié)點(diǎn)掛掉,會(huì)不會(huì)影響其它節(jié)點(diǎn),任務(wù)如何繼續(xù)?

(3)既然是分布式,如何保證架構(gòu)的可伸縮性和可擴(kuò)展性?不同優(yōu)先級(jí)的抓取任務(wù)如何進(jìn)行資源分配和調(diào)度?

基于上述問題,我選擇使用celery作為分布式任務(wù)調(diào)度工具,是分布式爬蟲中任務(wù)和資源調(diào)度的核心模塊。它會(huì)把所有任務(wù)都通過(guò)消息隊(duì)列發(fā)送給各個(gè)分布式節(jié)點(diǎn)進(jìn)行執(zhí)行,所以可以很好的保證url不會(huì)被重復(fù)抓??;它在檢測(cè)到worker掛掉的情況下,會(huì)嘗試向其他的worker重新發(fā)送這個(gè)任務(wù)信息,這樣第二個(gè)問題也可以得到解決;celery自帶任務(wù)路由,我們可以根據(jù)實(shí)際情況在不同的節(jié)點(diǎn)上運(yùn)行不同的抓取任務(wù)(在實(shí)戰(zhàn)篇我會(huì)講到)。本文主要就是帶大家了解一下celery的方方面面(有celery相關(guān)經(jīng)驗(yàn)的同學(xué)和大??梢灾苯犹^(guò)了)

Celery知識(shí)儲(chǔ)備

celery基礎(chǔ)講解

按celery官網(wǎng)的介紹來(lái)說(shuō)

Celery

是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),并且提供維護(hù)這樣一個(gè)系統(tǒng)的必需工具。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。

下面幾個(gè)關(guān)于celery的核心知識(shí)點(diǎn)

broker:翻譯過(guò)來(lái)叫做中間人。它是一個(gè)消息傳輸?shù)闹虚g件,可以理解為一個(gè)郵箱。每當(dāng)應(yīng)用程序調(diào)用celery的異步任務(wù)的時(shí)候,會(huì)向broker傳遞消息,而后celery的worker將會(huì)取到消息,執(zhí)行相應(yīng)程序。這其實(shí)就是消費(fèi)者和生產(chǎn)者之間的橋梁。

backend:

通常程序發(fā)送的消息,發(fā)完就完了,可能都不知道對(duì)方時(shí)候接受了。為此,celery實(shí)現(xiàn)了一個(gè)backend,用于存儲(chǔ)這些消息以及celery執(zhí)行的一些消息和結(jié)果。

worker:

Celery類的實(shí)例,作用就是執(zhí)行各種任務(wù)。注意在celery3.1.25后windows是不支持celery

worker的!

producer:

發(fā)送任務(wù),將其傳遞給broker

beat:

celery實(shí)現(xiàn)的定時(shí)任務(wù)??梢詫⑵淅斫鉃橐粋€(gè)producer,因?yàn)樗彩峭ㄟ^(guò)網(wǎng)絡(luò)調(diào)用定時(shí)將任務(wù)發(fā)送給worker執(zhí)行。注意在windows上celery是不支持定時(shí)任務(wù)的!

下面是關(guān)于celery的架構(gòu)示意圖,結(jié)合上面文字的話應(yīng)該會(huì)更好理解

由于celery只是任務(wù)隊(duì)列,而不是真正意義上的消息隊(duì)列,它自身不具有存儲(chǔ)數(shù)據(jù)的功能,所以broker和backend需要通過(guò)第三方工具來(lái)存儲(chǔ)信息,celery官方推薦的是

RabbitMQ和Redis,另外mongodb等也可以作為broker或者backend,可能不會(huì)很穩(wěn)定,我們這里選擇Redis作為broker兼backend。

實(shí)際例子

先安裝celery

pip

install

celery

我們以官網(wǎng)給出的例子來(lái)做說(shuō)明,并對(duì)其進(jìn)行擴(kuò)展。首先在項(xiàng)目根目錄下,這里我新建一個(gè)項(xiàng)目叫做celerystudy,然后切換到該項(xiàng)目目錄下,新建文件tasks.py,然后在其中輸入下面代碼

這里我詳細(xì)講一下代碼:我們先通過(guò)app=Celery()來(lái)實(shí)例化一個(gè)celery對(duì)象,在這個(gè)過(guò)程中,我們指定了它的broker,是redis的db

2,也指定了它的backend,是redis的db3,

broker和backend的連接形式大概是這樣

redis://:password@hostname:port/db_number

然后定義了一個(gè)add函數(shù),重點(diǎn)是@app.task,它的作用在我看來(lái)就是將add()

注冊(cè)為一個(gè)類似服務(wù)的東西,本來(lái)只能通過(guò)本地調(diào)用的函數(shù)被它裝飾后,就可以通過(guò)網(wǎng)絡(luò)來(lái)調(diào)用。這個(gè)tasks.py中的app就是一個(gè)worker。它可以有很多任務(wù),比如這里的任務(wù)函數(shù)add。我們?cè)偻ㄟ^(guò)在命令行切換到項(xiàng)目根目錄,執(zhí)行

celery

-A

tasks

worker

-l

info

啟動(dòng)成功后就是下圖所示的樣子

這里我說(shuō)一下各個(gè)參數(shù)的意思,-A指定的是app(即Celery實(shí)例)所在的文件模塊,我們的app是放在tasks.py中,所以這里是

tasks;worker表示當(dāng)前以worker的方式運(yùn)行,難道還有別的方式?對(duì)的,比如運(yùn)行定時(shí)任務(wù)就不用指定worker這個(gè)關(guān)鍵字;

-l

info表示該worker節(jié)點(diǎn)的日志等級(jí)是info,更多關(guān)于啟動(dòng)worker的參數(shù)(比如-c、-Q等常用的)請(qǐng)使用

celery

worker

--help

進(jìn)行查看

將worker啟動(dòng)起來(lái)后,我們就可以通過(guò)網(wǎng)絡(luò)來(lái)調(diào)用add函數(shù)了。我們?cè)诤竺娴姆植际脚老x構(gòu)建中也是采用這種方式分發(fā)和消費(fèi)url的。在命令行先切換到項(xiàng)目根目錄,然后打開python交互端

from

tasks

import

addrs

=

add.delay(2,

2)

這里的add.delay就是通過(guò)網(wǎng)絡(luò)調(diào)用將任務(wù)發(fā)送給add所在的worker執(zhí)行,這個(gè)時(shí)候我們可以在worker的界面看到接收的任務(wù)和計(jì)算的結(jié)果。

這里是異步調(diào)用,如果我們需要返回的結(jié)果,那么要等rs的ready狀態(tài)true才行。這里add看不出效果,不過(guò)試想一下,如果我們是調(diào)用的比較占時(shí)間的io任務(wù),那么異步任務(wù)就比較有價(jià)值了

上面講的是從Python交互終端中調(diào)用add函數(shù),如果我們要從另外一個(gè)py文件調(diào)用呢?除了通過(guò)import然后add.delay()這種方式,我們還可以通過(guò)send_task()這種方式,我們?cè)陧?xiàng)目根目錄另外新建一個(gè)py文件叫做

excute_tasks.py,在其中寫下如下的代碼

from

tasks

import

addif

__name__

==

'__main__':

add.delay(5,

10)

這時(shí)候可以在celery的worker界面看到執(zhí)行的結(jié)果

此外,我們還可以通過(guò)send_task()來(lái)調(diào)用,將excute_tasks.py改成這樣

這種方式也是可以的。send_task()還可能接收到為注冊(cè)(即通過(guò)@app.task裝飾)的任務(wù),這個(gè)時(shí)候worker會(huì)忽略這個(gè)消息

定時(shí)任務(wù)

上面部分講了怎么啟動(dòng)worker和調(diào)用worker的相關(guān)函數(shù),這里再講一下celery的定時(shí)任務(wù)。

爬蟲由于其特殊性,可能需要定時(shí)做增量抓取,也可能需要定時(shí)做模擬登陸,以防止cookie過(guò)期,而celery恰恰就實(shí)現(xiàn)了定時(shí)任務(wù)的功能。在上述基礎(chǔ)上,我們將tasks.py文件改成如下內(nèi)容

然后先通過(guò)ctrl+c停掉前一個(gè)worker,因?yàn)槲覀兇a改了,需要重啟worker才會(huì)生效。我們?cè)俅我詂elery

-A

tasks

worker

-l

info這個(gè)命令開啟worker。

這個(gè)時(shí)候我們只是開啟了worker,如果要讓worker執(zhí)行任務(wù),那么還需要通過(guò)beat給它定時(shí)發(fā)送,我們?cè)匍_一個(gè)命令行,切換到項(xiàng)目根目錄,通過(guò)

這樣就表示定時(shí)任務(wù)已經(jīng)開始運(yùn)行了。

眼尖的同學(xué)可能看到我這里celery的版本是3.1.25,這是因?yàn)閏elery支持的windows最高版本是3.1.25。由于我的分布式微博爬蟲的worker也同時(shí)部署在了windows上,所以我選擇了使用

3.1.25。如果全是linux系統(tǒng),建議使用celery4。

此外,還有一點(diǎn)需要注意,在celery4后,定時(shí)任務(wù)(通過(guò)schedule調(diào)度的會(huì)這樣,通過(guò)crontab調(diào)度的會(huì)馬上執(zhí)行)會(huì)在當(dāng)前時(shí)間再過(guò)定時(shí)間隔執(zhí)行第一次任務(wù),比如我這里設(shè)置的是60秒的間隔,那么第一次執(zhí)行add會(huì)在我們通過(guò)celery

beat

-A

tasks

-l

info啟動(dòng)定時(shí)任務(wù)后60秒才執(zhí)行;celery3.1.25則會(huì)馬上執(zhí)行該任務(wù)

Python中的多進(jìn)程與多線程/分布式該如何使用

Python提供了非常好用的多進(jìn)程包multiprocessing,你只需要定義一個(gè)函數(shù),Python會(huì)替你完成其他所有事情。

借助這個(gè)包,可以輕松完成從單進(jìn)程到并發(fā)執(zhí)行的轉(zhuǎn)換。

1、新建單一進(jìn)程

如果我們新建少量進(jìn)程,可以如下:

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

if __name__ == "__main__":

p = multiprocessing.Process(target=func, args=("hello", ))

p.start()

p.join()

print "Sub-process done."12345678910111213

2、使用進(jìn)程池

是的,你沒有看錯(cuò),不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡(jiǎn)單。

注意要用apply_async,如果落下async,就變成阻塞版本了。

processes=4是最多并發(fā)進(jìn)程數(shù)量。

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

if __name__ == "__main__":

pool = multiprocessing.Pool(processes=4)

for i in xrange(10):

msg = "hello %d" %(i)

pool.apply_async(func, (msg, ))

pool.close()

pool.join()

print "Sub-process(es) done."12345678910111213141516

3、使用Pool,并需要關(guān)注結(jié)果

更多的時(shí)候,我們不僅需要多進(jìn)程執(zhí)行,還需要關(guān)注每個(gè)進(jìn)程的執(zhí)行結(jié)果,如下:

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

return "done " + msg

if __name__ == "__main__":

pool = multiprocessing.Pool(processes=4)

result = []

for i in xrange(10):

msg = "hello %d" %(i)

result.append(pool.apply_async(func, (msg, )))

pool.close()

pool.join()

for res in result:

print res.get()

print "Sub-process(es) done."1234567891011121314151617181920

2014.12.25更新

根據(jù)網(wǎng)友評(píng)論中的反饋,在Windows下運(yùn)行有可能崩潰(開啟了一大堆新窗口、進(jìn)程),可以通過(guò)如下調(diào)用來(lái)解決:

multiprocessing.freeze_support()1

附錄(自己的腳本):

#!/usr/bin/python

import threading

import subprocess

import datetime

import multiprocessing

def dd_test(round, th):

test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)

command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg

print command

subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)

def mds_stat(round):

p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)

out = p.stdout.readlines()

if out[0].find('active') != -1:

command = "echo '0205pm %s round mds status OK, %s' /round_record" %(round, datetime.datetime.now())

command_2 = "time (ls /zbkc/test_mds_crash/) 2/round_record"

command_3 = "ls /zbkc/test_mds_crash | wc -l /round_record"

subprocess.call(command,shell=True)

subprocess.call(command_2,shell=True)

subprocess.call(command_3,shell=True)

return 1

else:

command = "echo '0205 %s round mds status abnormal, %s, %s' /round_record" %(round, out[0], datetime.datetime.now())

subprocess.call(command,shell=True)

return 0

#threads = []

for round in range(1, 1600):

pool = multiprocessing.Pool(processes = 10) #使用進(jìn)程池

for th in range(10):

# th_name = "thread-" + str(th)

# threads.append(th_name) #添加線程到線程列表

# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #創(chuàng)建多線程任務(wù)

pool.apply_async(dd_test, (round, th))

pool.close()

pool.join()

#等待線程完成

# for t in threads:

# t.join()

if mds_stat(round) == 0:

subprocess.call("zbkc -s",shell=True)

break

Python分布式進(jìn)程中你會(huì)遇到的坑

寫在前面

小驚大怪

你是不是在用Python3或者在windows系統(tǒng)上編程?最重要的是你對(duì)進(jìn)程和線程不是很清楚?那么恭喜你,在python分布式進(jìn)程中,會(huì)有坑等著你去挖。。。(hahahaha,此處允許我嚇唬一下你)開玩笑的啦,不過(guò),如果你知道序列中不支持匿名函數(shù),那這個(gè)坑就和你say byebye了。好了話不多數(shù),直接進(jìn)入正題。

分布式進(jìn)程

正如大家所知道的Process比Thread更穩(wěn)定,而且Process可以分布到多臺(tái)機(jī)器上,而Thread最多只能分布到同一臺(tái)機(jī)器的多個(gè)CPU上。Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。一個(gè)服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信。由于managers模塊封裝很好,不必了解網(wǎng)絡(luò)通信的細(xì)節(jié),就可以很容易地編寫分布式多進(jìn)程程序。

代碼記錄

舉個(gè)例子

如果我們已經(jīng)有一個(gè)通過(guò)Queue通信的多進(jìn)程程序在同一臺(tái)機(jī)器上運(yùn)行,現(xiàn)在,由于處理任務(wù)的進(jìn)程任務(wù)繁重,希望把發(fā)送任務(wù)的進(jìn)程和處理任務(wù)的進(jìn)程分布到兩臺(tái)機(jī)器上,這應(yīng)該怎么用分布式進(jìn)程來(lái)實(shí)現(xiàn)呢?你已經(jīng)知道了原有的Queue可以繼續(xù)使用,而且通過(guò)managers模塊把Queue通過(guò)網(wǎng)絡(luò)暴露出去,就可以讓其他機(jī)器的進(jìn)程來(lái)訪問Queue了。好,那我們就這么干!

寫個(gè)task_master.py

我們先看服務(wù)進(jìn)程。服務(wù)進(jìn)程負(fù)責(zé)啟動(dòng)Queue,把Queue注冊(cè)到網(wǎng)絡(luò)上,然后往Queue里面寫入任務(wù)。

請(qǐng)注意,當(dāng)我們?cè)谝慌_(tái)機(jī)器上寫多進(jìn)程程序時(shí),創(chuàng)建的Queue可以直接拿來(lái)用,但是,在分布式多進(jìn)程環(huán)境下,添加任務(wù)到Queue不可以直接對(duì)原始的task_queue進(jìn)行操作,那樣就繞過(guò)了QueueManager的封裝,必須通過(guò)manager.get_task_queue()獲得的Queue接口添加。然后,在另一臺(tái)機(jī)器上啟動(dòng)任務(wù)進(jìn)程(本機(jī)上啟動(dòng)也可以)

寫個(gè)task_worker.py

任務(wù)進(jìn)程要通過(guò)網(wǎng)絡(luò)連接到服務(wù)進(jìn)程,所以要指定服務(wù)進(jìn)程的IP。

運(yùn)行結(jié)果

現(xiàn)在,可以試試分布式進(jìn)程的工作效果了。先啟動(dòng)task_master.py服務(wù)進(jìn)程:

task_master.py進(jìn)程發(fā)送完任務(wù)后,開始等待result隊(duì)列的結(jié)果?,F(xiàn)在啟動(dòng)task_worker.py進(jìn)程:

看到?jīng)],結(jié)果都出錯(cuò)了,我們好好分析一下到底哪出錯(cuò)了。。。

錯(cuò)誤分析

在task_master.py的報(bào)錯(cuò)提示中,我們知道它說(shuō)lambda錯(cuò)誤,這是因?yàn)樾蛄谢恢С帜涿瘮?shù),所以我們得修改代碼,重新對(duì)queue用QueueManager進(jìn)行封裝放到網(wǎng)絡(luò)中。

其中task_queue和result_queue是兩個(gè)隊(duì)列,分別存放任務(wù)和結(jié)果。它們用來(lái)進(jìn)行進(jìn)程間通信,交換對(duì)象。

因?yàn)槭欠植际降沫h(huán)境,放入queue中的數(shù)據(jù)需要等待Workers機(jī)器運(yùn)算處理后再進(jìn)行讀取,這樣就需要對(duì)queue用QueueManager進(jìn)行封裝放到網(wǎng)絡(luò)中,這是通過(guò)上面的2行代碼來(lái)實(shí)現(xiàn)的。我們給return_task_queue的網(wǎng)絡(luò)調(diào)用接口取了一個(gè)名get_task_queue,而return_result_queue的名字是get_result_queue,方便區(qū)分對(duì)哪個(gè)queue進(jìn)行操作。task.put(n)即是對(duì)task_queue進(jìn)行寫入數(shù)據(jù),相當(dāng)于分配任務(wù)。而result.get()即是等待workers機(jī)器處理后返回的結(jié)果。

值得注意 在windows系統(tǒng)中你必須要寫IP地址,而其他操作系統(tǒng)比如linux操作系統(tǒng)則就不要了。

修改后的代碼

在task_master.py中修改如下:

在task_worker.py中修改如下:

先運(yùn)行task_master.py,然后再運(yùn)行task_worker.py

(1)task_master.py運(yùn)行結(jié)果如下

(2)task_worker.py運(yùn)行結(jié)果如下

知識(shí)補(bǔ)充

這個(gè)簡(jiǎn)單的Master/Worker模型有什么用?其實(shí)這就是一個(gè)簡(jiǎn)單但真正的分布式計(jì)算,把代碼稍加改造,啟動(dòng)多個(gè)worker,就可以把任務(wù)分布到幾臺(tái)甚至幾十臺(tái)機(jī)器上,比如把計(jì)算n*n的代碼換成發(fā)送郵件,就實(shí)現(xiàn)了郵件隊(duì)列的異步發(fā)送。

Queue對(duì)象存儲(chǔ)在哪?注意到task_worker.py中根本沒有創(chuàng)建Queue的代碼,所以,Queue對(duì)象存儲(chǔ)在task_master.py進(jìn)程中:

而Queue之所以能通過(guò)網(wǎng)絡(luò)訪問,就是通過(guò)QueueManager實(shí)現(xiàn)的。由于QueueManager管理的不止一個(gè)Queue,所以,要給每個(gè)Queue的網(wǎng)絡(luò)調(diào)用接口起個(gè)名字,比如get_task_queue。task_worker這里的QueueManager注冊(cè)的名字必須和task_manager中的一樣。對(duì)比上面的例子,可以看出Queue對(duì)象從另一個(gè)進(jìn)程通過(guò)網(wǎng)絡(luò)傳遞了過(guò)來(lái)。只不過(guò)這里的傳遞和網(wǎng)絡(luò)通信由QueueManager完成。

authkey有什么用?這是為了保證兩臺(tái)機(jī)器正常通信,不被其他機(jī)器惡意干擾。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定連接不上。

當(dāng)前名稱:Python的分布式函數(shù)的簡(jiǎn)單介紹
鏈接地址:http://jinyejixie.com/article46/hsephg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護(hù)、域名注冊(cè)、網(wǎng)站建設(shè)、網(wǎng)站收錄企業(yè)建站、網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

小程序開發(fā)
深水埗区| 恩施市| 明光市| 社旗县| 井冈山市| 库伦旗| 武功县| 剑阁县| 内丘县| 高雄市| 曲水县| 乌苏市| 广州市| 山东省| 平定县| 怀远县| 确山县| 河北省| 宜川县| 柞水县| 江西省| 天峨县| 乌拉特后旗| 平陆县| 蓬莱市| 株洲市| 绥德县| 阳谷县| 原平市| 金山区| 栾川县| 乐山市| 茶陵县| 五华县| 揭东县| 安仁县| 横峰县| 瑞昌市| 德州市| 南丹县| 镇赉县|