線程是操作系統(tǒng)能夠進行運算調(diào)度的最小單位,它被包含在進程之中,是進程中的實際運作單位,一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務(wù)。
在同一個進程內(nèi)的線程的數(shù)據(jù)是可以進行互相訪問的。
線程的切換使用過上下文來實現(xiàn)的,比如有一本書,有a和b這兩個人(兩個線程)看,a看完之后記錄當(dāng)前看到那一頁哪一行,然后交給b看,b看完之后記錄當(dāng)前看到了那一頁哪一行,此時a又要看了,那么a就通過上次記錄的值(上下文)直接找到上次看到了哪里,然后繼續(xù)往下看。
一個進程至少要包含一個線程,每個進程在啟動的時候就會自動的啟動一個線程,進程里面的第一個線程就是主線程,每次在進程內(nèi)創(chuàng)建的子線程都是由主線程進程創(chuàng)建和銷毀,子線程也可以由主線程創(chuàng)建出來的線程創(chuàng)建和銷毀線程。
進程是對各種資源管理的集合,比如要調(diào)用內(nèi)存、CPU、網(wǎng)卡、聲卡等,進程要操作上述的硬件之前都必須要創(chuàng)建一個線程,進程里面可以包含多個線程,QQ就是一個進程。
繼續(xù)拿QQ來說,比如我現(xiàn)在打卡了QQ的聊天窗口、個人信息窗口、設(shè)置窗口等,那么每一個打開的窗口都是一個線程,他們都在執(zhí)行不同的任務(wù),比如聊天窗口這個線程可以和好友進行互動,聊天,視頻等,個人信息窗口我可以查看、修改自己的資料。
為了進程安全起見,所以兩個進程之間的數(shù)據(jù)是不能夠互相訪問的(默認情況下),比如自己寫了一個應(yīng)用程序,然后讓別人運行起來,那么我的這個程序就可以訪問用戶啟動的其他應(yīng)用,我可以通過我自己的程序去訪問QQ,然后拿到一些聊天記錄等比較隱秘的信息,那么這個時候就不安全了,所以說進程與進程之間的數(shù)據(jù)是不可以互相訪問的,而且每一個進程的內(nèi)存是獨立的。
線程是執(zhí)行的指令集,進程是資源的集合
線程的啟動速度要比進程的啟動速度要快
兩個線程的執(zhí)行速度是一樣的
進程與線程的運行速度是沒有可比性的
線程共享創(chuàng)建它的進程的內(nèi)存空間,進程的內(nèi)存是獨立的。
兩個線程共享的數(shù)據(jù)都是同一份數(shù)據(jù),兩個子進程的數(shù)據(jù)不是共享的,而且數(shù)據(jù)是獨立的;
同一個進程的線程之間可以直接交流,同一個主進程的多個子進程之間是不可以進行交流,如果兩個進程之間需要通信,就必須要通過一個中間代理來實現(xiàn);
一個新的線程很容易被創(chuàng)建,一個新的進程創(chuàng)建需要對父進程進行一次克隆
一個線程可以控制和操作同一個進程里的其他線程,線程與線程之間沒有隸屬關(guān)系,但是進程只能操作子進程
改變主線程,有可能會影響到其他線程的行為,但是對于父進程的修改是不會影響子進程;
一個多并發(fā)的小腳本
import threading
import time
def Princ(String):
print('task', String)
time.sleep(5)
# target=目標(biāo)函數(shù), args=傳入的參數(shù)
t1 = threading.Thread(target=Princ, args=('t1',))
t1.start()
t2 = threading.Thread(target=Princ, args=('t1',))
t2.start()
t3 = threading.Thread(target=Princ, args=('t1',))
t3.start()
參考文檔
進程與線程的一個簡單解釋
http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
Linux進程與線程的區(qū)別
https://my.oschina.net/cnyinlinux/blog/422207
Thread module emulating a subset of Java’s threading model.
調(diào)用threading模塊調(diào)用線程的兩種方式
直接調(diào)用
import threading
import time
def Princ(String):
print('task', String)
time.sleep(5)
# target=目標(biāo)函數(shù), args=傳入的參數(shù)
t1 = threading.Thread(target=Princ, args=('t1',))
t1.start()
t2 = threading.Thread(target=Princ, args=('t1',))
t2.start()
t3 = threading.Thread(target=Princ, args=('t1',))
t3.start()
通過類調(diào)用
import threading
import time
class MyThreading(threading.Thread):
def __init__(self, conn):
super(MyThreading, self).__init__()
self.conn = conn
def run(self):
print('run task', self.conn)
time.sleep(5)
t1 = MyThreading('t1')
t2 = MyThreading('t2')
t1.start()
t2.start()
多線程在Python內(nèi)實則就是一個假象,為什么這么說呢,因為CPU的處理速度是很快的,所以我們看起來以一個線程在執(zhí)行多個任務(wù),每個任務(wù)的執(zhí)行速度是非常之快的,利用上下文切換來快速的切換任務(wù),以至于我們根本感覺不到。
但是頻繁的使用上下文切換也是要耗費一定的資源,因為單線程在每次切換任務(wù)的時候需要保存當(dāng)前任務(wù)的上下文。
什么時候用到多線程?
首先IO操作是不占用CPU的,只有計算的時候才會占用CPU(譬如1+1=2),Python中的多線程不適合CPU密集型的任務(wù),適合IO密集型的任務(wù)(sockt server)。
啟動多個線程
主進程在啟動之后會啟動一個主線程,下面的腳本中讓主線程啟動了多個子線程,然而啟動的子線程是獨立的,所以主線程不會等待子線程執(zhí)行完畢,而是主線程繼續(xù)往下執(zhí)行,并行執(zhí)行。
for i in range(50):
t = threading.Thread(target=Princ, args=('t-%s' % (i),))
t.start()
join()
join()方法可以讓程序等待每一個線程之后完成之后再往下執(zhí)行,又成為串行執(zhí)行。
import threading
import time
def Princ(String):
print('task', String)
time.sleep(1)
for i in range(50):
t = threading.Thread(target=Princ, args=('t-%s' % (i),))
t.start()
# 當(dāng)前線程執(zhí)行完畢之后在執(zhí)行后面的線程
t.join()
讓主線程阻塞,子現(xiàn)在并行執(zhí)行
import threading
import time
def Princ(String):
print('task', String)
time.sleep(2)
# 執(zhí)行子線程的時間
start_time = time.time()
# 存放線程的實例
t_objs = []
for i in range(50):
t = threading.Thread(target=Princ, args=('t-%s' % (i),))
t.start()
# 為了不讓后面的子線程阻塞,把當(dāng)前的子線程放入到一個列表中
t_objs.append(t)
# 循環(huán)所有子線程實例,等待所有子線程執(zhí)行完畢
for t in t_objs:
t.join()
# 當(dāng)前時間減去開始時間就等于執(zhí)行的過程中需要的時間
print(time.time() - start_time)
查看主線程與子線程
import threading
class MyThreading(threading.Thread):
def __init__(self):
super(MyThreading, self).__init__()
def run(self):
print('我是子線程: ', threading.current_thread())
t = MyThreading()
t.start()
print('我是主線程: ', threading.current_thread())
輸出如下:
C:\Python\Python35\python.exe E:/MyCodeProjects/進程與線程/s3.py
我是子線程: <MyThreading(Thread-1, started 7724)>
我是主線程: <_MainThread(MainThread, started 3680)>
Process finished with exit code 0
查看當(dāng)前進程的活動線程個數(shù)
import threading
class MyThreading(threading.Thread):
def __init__(self):
super(MyThreading, self).__init__()
def run(self):
print('www.anshengme.com')
t = MyThreading()
t.start()
print('線程個數(shù): ', threading.active_count())
輸出如下:
C:\Python\Python35\python.exe E:/MyCodeProjects/進程與線程/s3.py
www.anshengme.com
# 一個主線程和一個子線程
線程個數(shù): 2
Process finished with exit code 0
Event
Event是線程間通信最間的機制之一:一個線程發(fā)送一個event信號,其他的線程則等待這個信號。用于主線程控制其他線程的執(zhí)行。 Events 管理一個flag,這個flag可以使用set
()設(shè)置成True或者使用clear()重置為False,wait()則用于阻塞,在flag為True之前。flag默認為False。
選項 | 描述 |
---|---|
Event.wait([timeout]) | 堵塞線程,直到Event對象內(nèi)部標(biāo)識位被設(shè)為True或超時(如果提供了參數(shù)timeout) |
Event.set() | 將標(biāo)識位設(shè)為Ture |
Event.clear() | 將標(biāo)識伴設(shè)為False |
Event.isSet() | 判斷標(biāo)識位是否為Ture |
#!/use/bin/env python
# _*_ coding: utf-8- _*_
import threading
def runthreading(event):
print("Start...")
event.wait()
print("End...")
event_obj = threading.Event()
for n in range(10):
t = threading.Thread(target=runthreading, args=(event_obj,))
t.start()
event_obj.clear()
inp = input("True/False?>> ")
if inp == "True":
event_obj.set()
`
守護進程(守護線程)
一個主進程可以啟動多個守護進程,但是主進程必須要一直運行,如果主進程掛掉了,那么守護進程也會隨之掛掉
程序會等待主線程(進程)執(zhí)行完畢,但是不會等待守護進程(線程)
import threading
import time
def Princ(String):
print('task', String)
time.sleep(2)
for i in range(50):
t = threading.Thread(target=Princ, args=('t-%s' % (i),))
t.setDaemon(True) # 把當(dāng)前線程設(shè)置為守護線程,要在start之前設(shè)置
t.start()
場景預(yù)設(shè): 比如現(xiàn)在有一個FTP服務(wù),每一個用戶連接上去的時候都會創(chuàng)建一個守護線程,現(xiàn)在已經(jīng)有300個用戶連接上去了,就是說已經(jīng)創(chuàng)建了300個守護線程,但是突然之間FTP服務(wù)宕掉了,這個時候就不會等待守護線程執(zhí)行完畢再退出,而是直接退出,如果是普通的線程,那么就會登臺線程執(zhí)行完畢再退出。
#!/use/bin/env python
# _*_ coding:utf-8 _*_
from multiprocessing import Process
import time
def runprocess(arg):
print(arg)
time.sleep(2)
p = Process(target=runprocess, args=(11,))
p.daemon=True
p.start()
print("end")
線程之間的數(shù)據(jù)交互與鎖(互斥鎖)
python2.x需要加鎖,但是在python3.x上面就不需要了
# _*_ coding:utf-8 _*_
import threading
def Princ():
# 獲取鎖
lock.acquire()
# 在函數(shù)內(nèi)可以直接修改全局變量
global number
number += 1
# 為了避免讓程序出現(xiàn)串行,不能加sleep
# time.sleep(1)
# 釋放鎖
lock.release()
# 鎖
lock = threading.Lock()
# 主線程的number
number = 0
t_objs = []
for i in range(100):
t = threading.Thread(target=Princ)
t.start()
t_objs.append(t)
for t in t_objs:
t.join()
print('Number:', number)
遞歸鎖(Lock/RLock)
import threading
def run1():
print("grab the first part data")
lock.acquire()
global num
num += 1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2)
t_objs = []
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock() # RLock()類似創(chuàng)建了一個字典,每次退出的時候找到字典的值進行退出
# lock = threading.Lock() # Lock()會阻塞在這兒
for i in range(10):
t = threading.Thread(target=run3)
t.start()
t_objs.append(t)
for t in t_objs:
t.join()
print(num, num2)
信號量(Semaphore)
互斥鎖同時只允許一個線程更改數(shù)據(jù),而Semaphore是同時允許一定數(shù)量的線程更改數(shù)據(jù)
import threading
import time
def run(n):
semaphore.acquire() # 獲取信號,信號可以有多把鎖
time.sleep(1) # 等待一秒鐘
print("run the thread: %s\n" % n)
semaphore.release() # 釋放信號
t_objs = []
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5) # 聲明一個信號量,最多允許5個線程同時運行
for i in range(20): # 運行20個線程
t = threading.Thread(target=run, args=(i,)) # 創(chuàng)建線程
t.start() # 啟動線程
t_objs.append(t)
for t in t_objs:
t.join()
print('>>>>>>>>>>>>>')
以上代碼中,類似與創(chuàng)建了一個隊列,最多放5個任務(wù),每執(zhí)行完成一個任務(wù)就會往后面增加一個任務(wù)。
多進程的資源是獨立的,不可以互相訪問。
啟動一個進程
from multiprocessing import Process
import time
def f(name):
time.sleep(2)
print('hello', name)
if __name__ == '__main__':
# 創(chuàng)建一個進程
p = Process(target=f, args=('bob',))
# 啟動
p.start()
# 等待進程執(zhí)行完畢
p.join(
在進程內(nèi)啟動一個線程
from multiprocessing import Process
import threading
def Thread(String):
print(String)
def Proces(String):
print('hello', String)
t = threading.Thread(target=Thread, args=('Thread %s' % (String),)) # 創(chuàng)建一個線程
t.start() # 啟動它
if __name__ == '__main__':
p = Process(target=Proces, args=('World',)) # 創(chuàng)建一個進程
p.start() # 啟動
p.join() # 等待進程執(zhí)行完畢
啟動一個多進程
from multiprocessing import Process
import time
def f(name):
time.sleep(2)
print('hello', name)
if __name__ == '__main__':
for n in range(10): # 創(chuàng)建一個進程
p = Process(target=f, args=('bob %s' % (n),))
# 啟動
p.start()
# 等待進程執(zhí)行完畢
獲取啟動進程的PID
# _*_ coding:utf-8 _*_
from multiprocessing import Process
import os
def info(String):
print(String)
print('module name:', __name__)
print('父進程的PID:', os.getppid())
print('子進程的PID:', os.getpid())
print("\n")
def ChildProcess():
info('\033[31;1mChildProcess\033[0m')
if __name__ == '__main__':
info('\033[32;1mTheParentProcess\033[0m')
p = Process(target=ChildProcess)
p.start()
輸出結(jié)果
C:\Python\Python35\python.exe E:/MyCodeProjects/多進程/s1.py
TheParentProcess
module name: __main__
# Pycharm的PID
父進程的PID: 6888
# 啟動的腳本PID
子進程的PID: 4660
ChildProcess
module name: __mp_main__
# 腳本的PID
父進程的PID: 4660
# 父進程啟動的子進程PID
子進程的PID: 8452
Process finished with exit code 0
默認情況下進程與進程之間是不可以互相通信的,若要實現(xiàn)互相通信則需要一個中間件,另個進程之間通過中間件來實現(xiàn)通信,下面是進程間通信的幾種方式。
進程Queue
# _*_ coding:utf-8 _*_
from multiprocessing import Process, Queue
def ChildProcess(Q):
Q.put(['Hello', None, 'World']) # 在Queue里面上傳一個列表
if __name__ == '__main__':
q = Queue() # 創(chuàng)建一個Queue
p = Process(target=ChildProcess, args=(q,)) # 創(chuàng)建一個子進程,并把Queue傳給子進程,相當(dāng)于克隆了一份Queue
p.start() # 啟動子進程
print(q.get()) # 獲取q中的數(shù)據(jù)
p.join()
管道(Pipes)
# _*_ coding:utf-8 _*_
from multiprocessing import Process, Pipe
def ChildProcess(conn):
conn.send(['Hello', None, 'World']) # 寫一段數(shù)據(jù)
conn.close() # 關(guān)閉
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # 生成一個管道實例,parent_conn, child_conn管道的兩頭
p = Process(target=ChildProcess, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 收取消息
p.join()
數(shù)據(jù)共享(Managers)
# _*_ coding:utf-8 _*_
# _*_ coding:utf-8 _*_
from multiprocessing import Process, Manager
import os
def ChildProcess(Dict, List):
Dict['k1'] = 'v1'
Dict['k2'] = 'v2'
List.append(os.getpid()) # 獲取子進程的PID
print(List) # 輸出列表中的內(nèi)容
if __name__ == '__main__':
manager = Manager() # 生成Manager對象
Dict = manager.dict() # 生成一個可以在多個進程之間傳遞共享的字典
List = manager.list() # 生成一個字典
ProcessList = [] # 創(chuàng)建一個空列表,存放進程的對象,等待子進程執(zhí)行用于
for i in range(10): # 生成是個子進程
p = Process(target=ChildProcess, args=(Dict, List)) # 創(chuàng)建一個子進程
p.start() # 啟動
ProcessList.append(p) # 把子進程添加到p_list列表中
for res in ProcessList: # 循環(huán)所有的子進程
res.join() # 等待執(zhí)行完畢
print('\n')
print(Dict)
print(List)
輸出結(jié)果
C:\Python\Python35\python.exe E:/MyCodeProjects/多進程/s4.py
[5112]
[5112, 3448]
[5112, 3448, 4584]
[5112, 3448, 4584, 2128]
[5112, 3448, 4584, 2128, 11124]
[5112, 3448, 4584, 2128, 11124, 10628]
[5112, 3448, 4584, 2128, 11124, 10628, 5512]
[5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460]
[5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484]
[5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484, 6804]
{'k1': 'v1', 'k2': 'v2'}
[5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484, 6804]
Process finished with exit code 0
鎖(Lock)
from multiprocessing import Process, Lock
def ChildProcess(l, i):
l.acquire() # 獲取鎖
print('hello world', i)
l.release() # 釋放鎖
if __name__ == '__main__':
lock = Lock() # 生成Lock對象
for num in range(10):
Process(target=ChildProcess, args=(lock, num)).start() # 創(chuàng)建并啟動一個子進程
同一時間啟動多少個進程
#!/use/bin/env python
# _*_ coding: utf-8 _*_
from multiprocessing import Pool
import time
def myFun(i):
time.sleep(2)
return i+100
def end_call(arg):
print("end_call>>", arg)
p = Pool(5) # 允許進程池內(nèi)同時放入5個進程
for i in range(10):
p.apply_async(func=myFun, args=(i,),callback=end_call) # # 平行執(zhí)行,callback是主進程來調(diào)用
# p.apply(func=Foo) # 串行執(zhí)行
print("end")
p.close()
p.join() # 進程池中進程執(zhí)行完畢后再關(guān)閉,如果注釋,那么程序直接關(guān)閉。
簡單實現(xiàn)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import queue
import time
class MyThread:
def __init__(self,max_num=10):
self.queue = queue.Queue()
for n in range(max_num):
self.queue.put(threading.Thread)
def get_thread(self):
return self.queue.get()
def put_thread(self):
self.queue.put(threading.Thread)
pool = MyThread(5)
def RunThread(arg,pool):
print(arg)
time.sleep(2)
pool.put_thread()
for n in range(30):
thread = pool.get_thread()
t = thread(target=RunThread, args=(n,pool,))
t.start()
復(fù)雜版本
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import contextlib
import time
StopEvent = object()
class ThreadPool(object):
def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = []
self.free_list = []
def run(self, func, args, callback=None):
"""
線程池執(zhí)行一個任務(wù)
:param func: 任務(wù)函數(shù)
:param args: 任務(wù)函數(shù)所需參數(shù)
:param callback: 任務(wù)執(zhí)行失敗或成功后執(zhí)行的回調(diào)函數(shù),回調(diào)函數(shù)有兩個參數(shù)1、任務(wù)函數(shù)執(zhí)行狀態(tài);2、任務(wù)函數(shù)返回值(默認為None,即:不執(zhí)行回調(diào)函數(shù))
:return: 如果線程池已經(jīng)終止,則返回True否則None
"""
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)
def generate_thread(self):
"""
創(chuàng)建一個線程
"""
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)
"""
current_thread = threading.currentThread()
self.generate_list.append(current_thread)
event = self.q.get()
while event != StopEvent:
func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None
if callback is not None:
try:
callback(success, result)
except Exception as e:
pass
with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else:
self.generate_list.remove(current_thread)
def close(self):
"""
執(zhí)行完所有的任務(wù)后,所有線程停止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1
def terminate(self):
"""
無論是否還有任務(wù),終止線程
"""
self.terminal = True
while self.generate_list:
self.q.put(StopEvent)
self.q.queue.clear()
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于記錄線程中正在等待的線程數(shù)
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
# How to use
pool = ThreadPool(5)
def callback(status, result):
# status, execute action status
# result, execute action return value
pass
def action(i):
print(i)
for i in range(30):
ret = pool.run(action, (i,), callback)
time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
pool.terminate()
IO密集型(I/O bound)
頻繁網(wǎng)絡(luò)傳輸、讀取硬盤及其他IO設(shè)備稱之為IO密集型,最簡單的就是硬盤存取數(shù)據(jù),IO操作并不會涉及到CPU。
計算密集型(CPU bound)
程序大部分在做計算、邏輯判斷、循環(huán)導(dǎo)致cpu占用率很高的情況,稱之為計算密集型,比如說python程序中執(zhí)行了一段代碼1+1,這就是在計算1+1的值
本文出自 “一盞燭光” 博客,謝絕轉(zhuǎn)載!
更多建議: