App下載

使用 Python 進(jìn)行實(shí)用的線(xiàn)程編程

花式作死冠軍 2021-09-13 14:13:25 瀏覽數(shù) (2702)
反饋

介紹

Python 不乏并發(fā)選項(xiàng),標(biāo)準(zhǔn)庫(kù)包括對(duì)線(xiàn)程、進(jìn)程和異步 I/O 的支持。在許多情況下,Python 通過(guò)創(chuàng)建異步、線(xiàn)程和子進(jìn)程等高級(jí)模塊,消除了使用這些各種并發(fā)方法的困難。在標(biāo)準(zhǔn)庫(kù)之外,還有第三種解決方案,例如twisted、stackless 和處理模塊,僅舉幾例。本文使用實(shí)踐示例專(zhuān)門(mén)關(guān)注 Python 中的線(xiàn)程處理。網(wǎng)上有很多很好的資源來(lái)記錄線(xiàn)程 API,但本文試圖提供常見(jiàn)線(xiàn)程使用模式的實(shí)踐示例。

首先定義進(jìn)程和線(xiàn)程之間的區(qū)別很重要。線(xiàn)程與進(jìn)程的不同之處在于它們共享狀態(tài)、內(nèi)存和資源。這個(gè)簡(jiǎn)單的區(qū)別對(duì)于線(xiàn)程來(lái)說(shuō)既是優(yōu)點(diǎn)也是缺點(diǎn)。一方面,線(xiàn)程是輕量級(jí)的并且易于通信,但另一方面,它們帶來(lái)了一系列問(wèn)題,包括死鎖、競(jìng)爭(zhēng)條件和純粹的復(fù)雜性。幸運(yùn)的是,由于 GIL 和排隊(duì)模塊,Python 中的線(xiàn)程實(shí)現(xiàn)起來(lái)比其他語(yǔ)言要簡(jiǎn)單得多。

你好 Python 線(xiàn)程

接下來(lái),我假設(shè)你已經(jīng)安裝了 Python 2.5 或更高版本,因?yàn)樵S多示例將使用 Python 語(yǔ)言的更新功能,這些功能至少出現(xiàn)在 Python2.5 中。要開(kāi)始使用 Python 中的線(xiàn)程,我們將從一個(gè)簡(jiǎn)單的“Hello World”示例開(kāi)始:

清單 1. hello_threads_example
        import threading
        import datetime
        
        class ThreadClass(threading.Thread):
          def run(self):
            now = datetime.datetime.now()
            print "%s says Hello World at time: %s" % 
            (self.getName(), now)
        
        for i in range(2):
          t = ThreadClass()
          t.start()

如果你運(yùn)行這個(gè)例子,你會(huì)得到以下輸出:

      #python hello_threads.py 
      Thread?1 says Hello World at time: 2008?05?13 13:22:50.252069
      Thread?2 says Hello World at time: 2008?05?13 13:22:50.252576

查看此輸出,你可以看到你收到了來(lái)自?xún)蓚€(gè)帶有日期戳的線(xiàn)程的 Hello World 語(yǔ)句。如果你查看實(shí)際代碼,會(huì)發(fā)現(xiàn)有兩個(gè) import 語(yǔ)句;一個(gè)導(dǎo)入 datetime 模塊,另一個(gè)導(dǎo)入 threading 模塊。該類(lèi)ThreadClass繼承自threading.Thread,因此,您需要定義一個(gè) run 方法來(lái)執(zhí)行您在線(xiàn)程內(nèi)運(yùn)行的代碼。在 run 方法中唯一需要注意的重要事項(xiàng)self.getName()是該方法將標(biāo)識(shí)線(xiàn)程的名稱(chēng)。

最后三行代碼實(shí)際上調(diào)用了類(lèi)并啟動(dòng)了線(xiàn)程。如果您注意到,t.start()實(shí)際上是啟動(dòng)線(xiàn)程的。線(xiàn)程模塊在設(shè)計(jì)時(shí)就考慮到了繼承性,實(shí)際上是建立在較低級(jí)別的線(xiàn)程模塊之上的。在大多數(shù)情況下,繼承自 被認(rèn)為是最佳實(shí)踐threading.Thread,因?yàn)樗鼮榫€(xiàn)程編程創(chuàng)建了一個(gè)非常自然的 API。

使用帶線(xiàn)程的隊(duì)列

正如我之前提到的,當(dāng)線(xiàn)程需要共享數(shù)據(jù)或資源時(shí),線(xiàn)程處理可能會(huì)很復(fù)雜。線(xiàn)程模塊確實(shí)提供了許多同步原語(yǔ),包括信號(hào)量、條件變量、事件和鎖。雖然存在這些選項(xiàng),但最好的做法是專(zhuān)注于使用隊(duì)列。隊(duì)列更容易處理,并使線(xiàn)程編程更加安全,因?yàn)樗鼈冇行У貙⑺袑?duì)資源的訪(fǎng)問(wèn)集中到單個(gè)線(xiàn)程,并允許更清晰、更易讀的設(shè)計(jì)模式。

在下一個(gè)示例中,你將首先創(chuàng)建一個(gè)程序,該程序?qū)⒁来位蛞粋€(gè)接一個(gè)地獲取網(wǎng)站的 URL,并打印出頁(yè)面的前 1024 個(gè)字節(jié)。這是使用線(xiàn)程可以更快地完成某些事情的經(jīng)典示例。首先,讓我們使用urllib2模塊一次抓取這些頁(yè)面,并對(duì)代碼進(jìn)行計(jì)時(shí):

清單 2. URL 獲取序列
        import urllib2
        import time
        
        hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"        
        start = time.time()
        #grabs urls of hosts and prints first 1024 bytes of page
        for host in hosts:
          url = urllib2.urlopen(host)
          print url.read(1024)
        
        print "Elapsed Time: %s" % (time.time() ? start)

當(dāng)你運(yùn)行它時(shí),你會(huì)得到大量輸出到標(biāo)準(zhǔn)輸出,因?yàn)轫?yè)面被部分打印。但你會(huì)在最后得到這個(gè):

        Elapsed Time: 2.40353488922

讓我們稍微看一下這段代碼。你只導(dǎo)入兩個(gè)模塊。首先,urllib2模塊是承擔(dān)重任并抓取網(wǎng)頁(yè)的東西。其次,你通過(guò)調(diào)用?time.time()?創(chuàng)建一個(gè)開(kāi)始時(shí)間值,然后再次調(diào)用它并減去初始值以確定程序執(zhí)行所需的時(shí)間。最后,從程序的速度來(lái)看,“兩秒半”的結(jié)果并不可怕,但如果你有數(shù)百個(gè)網(wǎng)頁(yè)要檢索,考慮到當(dāng)前的平均值,大約需要 50 秒??纯磩?chuàng)建線(xiàn)程版本如何加快速度:

清單 3. URL 獲取線(xiàn)程
          #!/usr/bin/env python
          import Queue
          import threading
          import urllib2
          import time
          
          hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
          "http://ibm.com", "http://apple.com"          
          queue = Queue.Queue()
          
          class ThreadUrl(threading.Thread):
          """Threaded Url Grab"""
            def init(self, queue):
              threading.Thread.init(self)
              self.queue = queue
          
            def run(self):
              while True:
                #grabs host from queue
                host = self.queue.get()
            
                #grabs urls of hosts and prints first 1024 bytes of page
                url = urllib2.urlopen(host)
                print url.read(1024)
            
                #signals to queue job is done
                self.queue.task_done()
          
          start = time.time()
          def main():
          
            #spawn a pool of threads, and pass them queue instance 
            for i in range(5):
              t = ThreadUrl(queue)
              t.setDaemon(True)
              t.start()
              
           #populate queue with data   
              for host in hosts:
                queue.put(host)
           
           #wait on the queue until everything has been processed     
           queue.join()
          
          main()
          print "Elapsed Time: %s" % (time.time() ? start)

這個(gè)例子有更多的代碼需要解釋?zhuān)捎谑褂昧伺抨?duì)模塊,它并沒(méi)有比第一個(gè)線(xiàn)程示例復(fù)雜多少。這種模式是在 Python 中使用線(xiàn)程的一種非常常見(jiàn)且推薦的方式。步驟描述如下:

  1. 創(chuàng)建一個(gè)?Queue.Queue()?實(shí)例,然后用數(shù)據(jù)填充它。
  2. 將填充數(shù)據(jù)的實(shí)例傳遞到從?threading.Thread?繼承而創(chuàng)建的?Thread?類(lèi)中。
  3. 產(chǎn)生一個(gè)守護(hù)線(xiàn)程池。
  4. 一次從隊(duì)列中拉出一項(xiàng),并在線(xiàn)程內(nèi)部使用該數(shù)據(jù)(即 run 方法)來(lái)完成這項(xiàng)工作。
  5. 工作完成后,向?queue.task_done()?隊(duì)列發(fā)送任務(wù)已完成的信號(hào)。
  6. 加入隊(duì)列,這實(shí)際上意味著等到隊(duì)列為空,然后退出主程序。

關(guān)于此模式的注意事項(xiàng):通過(guò)將守護(hù)線(xiàn)程設(shè)置為 true,它允許主線(xiàn)程或程序在只有守護(hù)線(xiàn)程處于活動(dòng)狀態(tài)時(shí)退出。這創(chuàng)建了一種控制程序流程的簡(jiǎn)單方法,因?yàn)槟憧梢栽谕顺鲋凹尤腙?duì)列,或等到隊(duì)列為空。確切的過(guò)程在隊(duì)列模塊的文檔中得到了最好的描述,如右側(cè)的資源部分所示:

join()
阻塞,直到隊(duì)列中的所有項(xiàng)目都被獲取和處理。每當(dāng)將項(xiàng)目添加到隊(duì)列時(shí),未完成任務(wù)的計(jì)數(shù)就會(huì)增加。每當(dāng)使用者線(xiàn)程調(diào)用 task_done() 以指示該項(xiàng)目已被檢索并且其上的所有工作已完成時(shí),未完成任務(wù)的計(jì)數(shù)就會(huì)下降。當(dāng)未完成任務(wù)的數(shù)量降至零時(shí), join()解鎖。

使用多個(gè)隊(duì)列

因?yàn)樯厦嫜菔镜哪J椒浅S行?,所以通過(guò)將額外的線(xiàn)程池與隊(duì)列鏈接來(lái)擴(kuò)展它是相對(duì)簡(jiǎn)單的。在上面的示例中,你只是打印出網(wǎng)頁(yè)的第一部分。下一個(gè)示例返回每個(gè)線(xiàn)程抓取的整個(gè)網(wǎng)頁(yè),然后將其放入另一個(gè)隊(duì)列。然后設(shè)置另一個(gè)加入第二個(gè)隊(duì)列的線(xiàn)程池,然后在網(wǎng)頁(yè)上工作。本示例中執(zhí)行的工作涉及使用名為 Beautiful Soup 的第三方 Python 模塊解析網(wǎng)頁(yè)。僅使用幾行代碼,使用此模塊,你將提取標(biāo)題標(biāo)簽并為你訪(fǎng)問(wèn)的每個(gè)頁(yè)面打印出來(lái)。

清單 4. 多隊(duì)列數(shù)據(jù)挖掘網(wǎng)站
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"
queue = Queue.Queue()
outqueue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def init(self, queue, outqueue):
        threading.Thread.init(self)
        self.queue = queue
        self.outqueue = outqueue

    def run(self):
        while True:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init(self, out_queue):
        threading.Thread.__init(self)
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

start = time.time()
def main():

    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() ? start)

如果你運(yùn)行此版本的腳本,你將獲得以下輸出:

  #python url_fetch_threaded_part2.py 

  <title>Google</title>  <title>Yahoo!</title>  <title>Apple</title>  <title>IBM United States</title>  <title>Amazon.com: Online Shopping for Electronics, Apparel,
 Computers, Books, DVDs & more</title>  Elapsed Time: 3.75387597084

在查看代碼時(shí),你可以看到我們添加了另一個(gè)隊(duì)列實(shí)例,然后將該隊(duì)列傳遞給第一個(gè)線(xiàn)程池類(lèi)ThreadURL. 接下來(lái),你幾乎為下一個(gè)線(xiàn)程池類(lèi)復(fù)制了完全相同的結(jié)構(gòu)DatamineThread。在這個(gè)類(lèi)的run方法中,從每個(gè)線(xiàn)程的隊(duì)列中抓取網(wǎng)頁(yè),chunk,然后用Beautiful Soup處理這個(gè)chunk。在這種情況下, 你可以使用 Beautiful Soup 來(lái)簡(jiǎn)單地從每個(gè)頁(yè)面中提取標(biāo)題標(biāo)簽并打印出來(lái)。這個(gè)例子可以很容易地變成更有用的東西,因?yàn)槟銚碛谢舅阉饕婊驍?shù)據(jù)挖掘工具的核心。一個(gè)想法是使用 Beautiful Soup 從每個(gè)頁(yè)面中提取鏈接,然后關(guān)注它們。

總結(jié)

本文探討了 Python 中的線(xiàn)程,并展示了使用隊(duì)列來(lái)減輕復(fù)雜性和細(xì)微錯(cuò)誤以及提高可讀代碼的最佳實(shí)踐。雖然這個(gè)基本模式相對(duì)簡(jiǎn)單,但它可以通過(guò)將隊(duì)列和線(xiàn)程池鏈接在一起來(lái)解決大量問(wèn)題。在最后一部分,您開(kāi)始探索創(chuàng)建一個(gè)更復(fù)雜的處理管道,作為未來(lái)項(xiàng)目的模型。在資源部分有很多關(guān)于并發(fā)和線(xiàn)程的優(yōu)秀資源。

最后,重要的是要指出線(xiàn)程并不是所有問(wèn)題的解決方案,而且進(jìn)程可以非常適合許多情況。如果你只需要分叉多個(gè)進(jìn)程并監(jiān)聽(tīng)響應(yīng),那么標(biāo)準(zhǔn)庫(kù) ??subprocess 模塊尤其可以更簡(jiǎn)單地處理。


0 人點(diǎn)贊