介紹
Python 不乏并發(fā)選項(xiàng),標(biāo)準(zhǔn)庫(kù)包括對(duì)線程、進(jìn)程和異步 I/O 的支持。在許多情況下,Python 通過創(chuàng)建異步、線程和子進(jìn)程等高級(jí)模塊,消除了使用這些各種并發(fā)方法的困難。在標(biāo)準(zhǔn)庫(kù)之外,還有第三種解決方案,例如twisted、stackless 和處理模塊,僅舉幾例。本文使用實(shí)踐示例專門關(guān)注 Python 中的線程處理。網(wǎng)上有很多很好的資源來記錄線程 API,但本文試圖提供常見線程使用模式的實(shí)踐示例。
首先定義進(jìn)程和線程之間的區(qū)別很重要。線程與進(jìn)程的不同之處在于它們共享狀態(tài)、內(nèi)存和資源。這個(gè)簡(jiǎn)單的區(qū)別對(duì)于線程來說既是優(yōu)點(diǎn)也是缺點(diǎn)。一方面,線程是輕量級(jí)的并且易于通信,但另一方面,它們帶來了一系列問題,包括死鎖、競(jìng)爭(zhēng)條件和純粹的復(fù)雜性。幸運(yùn)的是,由于 GIL 和排隊(duì)模塊,Python 中的線程實(shí)現(xiàn)起來比其他語(yǔ)言要簡(jiǎn)單得多。
你好 Python 線程
接下來,我假設(shè)你已經(jīng)安裝了 Python 2.5 或更高版本,因?yàn)樵S多示例將使用 Python 語(yǔ)言的更新功能,這些功能至少出現(xiàn)在 Python2.5 中。要開始使用 Python 中的線程,我們將從一個(gè)簡(jiǎn)單的“Hello World”示例開始:
清單 1. hello_threads_example
import threading
import datetime
class ThreadClass(threading.Thread):
def run(self):
now = datetime.datetime.now()
print "%s says Hello World at time: %s" %
(self.getName(), now)
for i in range(2):
t = ThreadClass()
t.start()
如果你運(yùn)行這個(gè)例子,你會(huì)得到以下輸出:
#python hello_threads.py
Thread?1 says Hello World at time: 2008?05?13 13:22:50.252069
Thread?2 says Hello World at time: 2008?05?13 13:22:50.252576
查看此輸出,你可以看到你收到了來自兩個(gè)帶有日期戳的線程的 Hello World 語(yǔ)句。如果你查看實(shí)際代碼,會(huì)發(fā)現(xiàn)有兩個(gè) import 語(yǔ)句;一個(gè)導(dǎo)入 datetime 模塊,另一個(gè)導(dǎo)入 threading 模塊。該類ThreadClass繼承自threading.Thread,因此,您需要定義一個(gè) run 方法來執(zhí)行您在線程內(nèi)運(yùn)行的代碼。在 run 方法中唯一需要注意的重要事項(xiàng)self.getName()是該方法將標(biāo)識(shí)線程的名稱。
最后三行代碼實(shí)際上調(diào)用了類并啟動(dòng)了線程。如果您注意到,t.start()實(shí)際上是啟動(dòng)線程的。線程模塊在設(shè)計(jì)時(shí)就考慮到了繼承性,實(shí)際上是建立在較低級(jí)別的線程模塊之上的。在大多數(shù)情況下,繼承自 被認(rèn)為是最佳實(shí)踐threading.Thread,因?yàn)樗鼮榫€程編程創(chuàng)建了一個(gè)非常自然的 API。
使用帶線程的隊(duì)列
正如我之前提到的,當(dāng)線程需要共享數(shù)據(jù)或資源時(shí),線程處理可能會(huì)很復(fù)雜。線程模塊確實(shí)提供了許多同步原語(yǔ),包括信號(hào)量、條件變量、事件和鎖。雖然存在這些選項(xiàng),但最好的做法是專注于使用隊(duì)列。隊(duì)列更容易處理,并使線程編程更加安全,因?yàn)樗鼈冇行У貙⑺袑?duì)資源的訪問集中到單個(gè)線程,并允許更清晰、更易讀的設(shè)計(jì)模式。
在下一個(gè)示例中,你將首先創(chuàng)建一個(gè)程序,該程序?qū)⒁来位蛞粋€(gè)接一個(gè)地獲取網(wǎng)站的 URL,并打印出頁(yè)面的前 1024 個(gè)字節(jié)。這是使用線程可以更快地完成某些事情的經(jīng)典示例。首先,讓我們使用urllib2模塊一次抓取這些頁(yè)面,并對(duì)代碼進(jìn)行計(jì)時(shí):
清單 2. URL 獲取序列
import urllib2
import time
hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"
start = time.time()
#grabs urls of hosts and prints first 1024 bytes of page
for host in hosts:
url = urllib2.urlopen(host)
print url.read(1024)
print "Elapsed Time: %s" % (time.time() ? start)
當(dāng)你運(yùn)行它時(shí),你會(huì)得到大量輸出到標(biāo)準(zhǔn)輸出,因?yàn)轫?yè)面被部分打印。但你會(huì)在最后得到這個(gè):
Elapsed Time: 2.40353488922
讓我們稍微看一下這段代碼。你只導(dǎo)入兩個(gè)模塊。首先,urllib2模塊是承擔(dān)重任并抓取網(wǎng)頁(yè)的東西。其次,你通過調(diào)用?time.time()
?創(chuàng)建一個(gè)開始時(shí)間值,然后再次調(diào)用它并減去初始值以確定程序執(zhí)行所需的時(shí)間。最后,從程序的速度來看,“兩秒半”的結(jié)果并不可怕,但如果你有數(shù)百個(gè)網(wǎng)頁(yè)要檢索,考慮到當(dāng)前的平均值,大約需要 50 秒。看看創(chuàng)建線程版本如何加快速度:
清單 3. URL 獲取線程
#!/usr/bin/env python
import Queue
import threading
import urllib2
import time
hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"
queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def init(self, queue):
threading.Thread.init(self)
self.queue = queue
def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and prints first 1024 bytes of page
url = urllib2.urlopen(host)
print url.read(1024)
#signals to queue job is done
self.queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue)
t.setDaemon(True)
t.start()
#populate queue with data
for host in hosts:
queue.put(host)
#wait on the queue until everything has been processed
queue.join()
main()
print "Elapsed Time: %s" % (time.time() ? start)
這個(gè)例子有更多的代碼需要解釋,但由于使用了排隊(duì)模塊,它并沒有比第一個(gè)線程示例復(fù)雜多少。這種模式是在 Python 中使用線程的一種非常常見且推薦的方式。步驟描述如下:
- 創(chuàng)建一個(gè)?
Queue.Queue()
?實(shí)例,然后用數(shù)據(jù)填充它。 - 將填充數(shù)據(jù)的實(shí)例傳遞到從?
threading.Thread
?繼承而創(chuàng)建的?Thread
?類中。 - 產(chǎn)生一個(gè)守護(hù)線程池。
- 一次從隊(duì)列中拉出一項(xiàng),并在線程內(nèi)部使用該數(shù)據(jù)(即 run 方法)來完成這項(xiàng)工作。
- 工作完成后,向?
queue.task_done()
?隊(duì)列發(fā)送任務(wù)已完成的信號(hào)。 - 加入隊(duì)列,這實(shí)際上意味著等到隊(duì)列為空,然后退出主程序。
關(guān)于此模式的注意事項(xiàng):通過將守護(hù)線程設(shè)置為 true,它允許主線程或程序在只有守護(hù)線程處于活動(dòng)狀態(tài)時(shí)退出。這創(chuàng)建了一種控制程序流程的簡(jiǎn)單方法,因?yàn)槟憧梢栽谕顺鲋凹尤腙?duì)列,或等到隊(duì)列為空。確切的過程在隊(duì)列模塊的文檔中得到了最好的描述,如右側(cè)的資源部分所示:
join()
阻塞,直到隊(duì)列中的所有項(xiàng)目都被獲取和處理。每當(dāng)將項(xiàng)目添加到隊(duì)列時(shí),未完成任務(wù)的計(jì)數(shù)就會(huì)增加。每當(dāng)使用者線程調(diào)用 task_done() 以指示該項(xiàng)目已被檢索并且其上的所有工作已完成時(shí),未完成任務(wù)的計(jì)數(shù)就會(huì)下降。當(dāng)未完成任務(wù)的數(shù)量降至零時(shí), join()解鎖。
使用多個(gè)隊(duì)列
因?yàn)樯厦嫜菔镜哪J椒浅S行?,所以通過將額外的線程池與隊(duì)列鏈接來擴(kuò)展它是相對(duì)簡(jiǎn)單的。在上面的示例中,你只是打印出網(wǎng)頁(yè)的第一部分。下一個(gè)示例返回每個(gè)線程抓取的整個(gè)網(wǎng)頁(yè),然后將其放入另一個(gè)隊(duì)列。然后設(shè)置另一個(gè)加入第二個(gè)隊(duì)列的線程池,然后在網(wǎng)頁(yè)上工作。本示例中執(zhí)行的工作涉及使用名為 Beautiful Soup 的第三方 Python 模塊解析網(wǎng)頁(yè)。僅使用幾行代碼,使用此模塊,你將提取標(biāo)題標(biāo)簽并為你訪問的每個(gè)頁(yè)面打印出來。
清單 4. 多隊(duì)列數(shù)據(jù)挖掘網(wǎng)站
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup
hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"
queue = Queue.Queue()
outqueue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def init(self, queue, outqueue):
threading.Thread.init(self)
self.queue = queue
self.outqueue = outqueue
def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and then grabs chunk of webpage
url = urllib2.urlopen(host)
chunk = url.read()
#place chunk into out queue
self.out_queue.put(chunk)
#signals to queue job is done
self.queue.task_done()
class DatamineThread(threading.Thread):
"""Threaded Url Grab"""
def __init(self, out_queue):
threading.Thread.__init(self)
self.out_queue = out_queue
def run(self):
while True:
#grabs host from queue
chunk = self.out_queue.get()
#parse the chunk
soup = BeautifulSoup(chunk)
print soup.findAll(['title'])
#signals to queue job is done
self.out_queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for host in hosts:
queue.put(host)
for i in range(5):
dt = DatamineThread(out_queue)
dt.setDaemon(True)
dt.start()
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
main()
print "Elapsed Time: %s" % (time.time() ? start)
如果你運(yùn)行此版本的腳本,你將獲得以下輸出:
#python url_fetch_threaded_part2.py
<title>Google</title> <title>Yahoo!</title> <title>Apple</title> <title>IBM United States</title> <title>Amazon.com: Online Shopping for Electronics, Apparel,
Computers, Books, DVDs & more</title> Elapsed Time: 3.75387597084
在查看代碼時(shí),你可以看到我們添加了另一個(gè)隊(duì)列實(shí)例,然后將該隊(duì)列傳遞給第一個(gè)線程池類ThreadURL. 接下來,你幾乎為下一個(gè)線程池類復(fù)制了完全相同的結(jié)構(gòu)DatamineThread。在這個(gè)類的run方法中,從每個(gè)線程的隊(duì)列中抓取網(wǎng)頁(yè),chunk,然后用Beautiful Soup處理這個(gè)chunk。在這種情況下, 你可以使用 Beautiful Soup 來簡(jiǎn)單地從每個(gè)頁(yè)面中提取標(biāo)題標(biāo)簽并打印出來。這個(gè)例子可以很容易地變成更有用的東西,因?yàn)槟銚碛谢舅阉饕婊驍?shù)據(jù)挖掘工具的核心。一個(gè)想法是使用 Beautiful Soup 從每個(gè)頁(yè)面中提取鏈接,然后關(guān)注它們。
總結(jié)
本文探討了 Python 中的線程,并展示了使用隊(duì)列來減輕復(fù)雜性和細(xì)微錯(cuò)誤以及提高可讀代碼的最佳實(shí)踐。雖然這個(gè)基本模式相對(duì)簡(jiǎn)單,但它可以通過將隊(duì)列和線程池鏈接在一起來解決大量問題。在最后一部分,您開始探索創(chuàng)建一個(gè)更復(fù)雜的處理管道,作為未來項(xiàng)目的模型。在資源部分有很多關(guān)于并發(fā)和線程的優(yōu)秀資源。
最后,重要的是要指出線程并不是所有問題的解決方案,而且進(jìn)程可以非常適合許多情況。如果你只需要分叉多個(gè)進(jìn)程并監(jiān)聽響應(yīng),那么標(biāo)準(zhǔn)庫(kù) ??subprocess 模塊尤其可以更簡(jiǎn)單地處理。