Python Multiprocessing 多进程并行计算¶
利用 Multiprocessing 实现多进程并行计算。
在运行一些耗时较长的任务(例如 Python 中的循环)时,使用更多的 CPU 资源进行多进程并行计算将大大缩短程序的运行时间。本文记录了 Multiprocessing 的基本用法和学习过程中的理解。
创建与运行进程¶
Process
, start
, join
¶
示例代码:
import multiprocessing
import time
import datetime
def sleep(t):
time.sleep(t)
if __name__ == "__main__":
starttime = datetime.datetime.now()
# creating processes
p1 = multiprocessing.Process(target=sleep, args=(5,))
p2 = multiprocessing.Process(target=sleep, args=(5,))
# starting process 1
p1.start()
# starting process 2
p2.start()
# wait until process 1 is finished
p1.join()
# wait until process 2 is finished
p2.join()
endtime = datetime.datetime.now()
print("多进程一共耗时{}秒".format((endtime - starttime)))
两个进程分别执行了sleep(5)
,总耗时约等于 5 秒。如果不使用多进程,则需要运行 10 秒。
-
p1 = multiprocessing.Process(target=sleep, args=(5, ))
新建了一个进程。这个进程要执行的函数是sleep()
,参数是5
。 -
如果只有一个参数,那么
5
后面的逗号的后面留空,但是逗号不能省略。 -
如果有两个或以上的参数,那么只需要将所有参数用括号括起来。
-
p1.start()
开始执行这个进程。 -
p1.join()
可以在当前位置阻塞主进程。这意味着,必须等p1
这个进程执行完毕,才能执行p1.join()
这行代码后面的代码。 -
如果不加
p1.join()
和p2.join()
,那么主程序会立即结束,而不会等待p1
和p2
运行完成后再结束。所以要记得加p1.join()
和p2.join()
!
pid
, is_alive
¶
示例代码:
# importing the multiprocessing module
import multiprocessing
import os
def worker1():
# printing process id
print("ID of process running worker1: {}".format(os.getpid()))
def worker2():
# printing process id
print("ID of process running worker2: {}".format(os.getpid()))
if __name__ == "__main__":
# printing main program process id
print("ID of main process: {}".format(os.getpid()))
# creating processes
p1 = multiprocessing.Process(target=worker1)
p2 = multiprocessing.Process(target=worker2)
# starting processes
p1.start()
p2.start()
# process IDs
print("ID of process p1: {}".format(p1.pid))
print("ID of process p2: {}".format(p2.pid))
# wait until processes are finished
p1.join()
p2.join()
# both processes finished
print("Both processes finished execution!")
# check if processes are alive
print("Process p1 is alive: {}".format(p1.is_alive()))
print("Process p2 is alive: {}".format(p2.is_alive()))
-
pid
是 Process ID,即当前进程的 ID。 -
在 multiprocessing 的 Process 对象中,可以用
p1.pid
直接查看p1
的进程 ID。 -
在程序运行时,可以用
os.getpid()
查看此处的进程 ID。 -
p1.is_alive()
可以判断 Process 是否仍然存在。当执行p1.join()
后,p1
便被释放了,因此返回False
。
进程之间的内存共享和数据传输¶
multiprocessing.Array
, multiprocessing.Value
创建可以共享的变量¶
前文所实现的多进程中,各个进程之间的运行是相互独立的,各个进程有自己的内存空间。在某一个进程中进行的运算,得到的结果是不会影响其他进程的内存的。要想在多个进程之间共享内存,可以用multiprocessing
中的Array
或Value
对象。
示例代码:
import multiprocessing
def square_list(mylist, result, square_sum):
"""
function to square a given list
"""
# append squares of mylist to result array
for idx, num in enumerate(mylist):
result[idx] = num * num
# square_sum value
square_sum.value = sum(result)
# print result Array
print("Result(in process p1): {}".format(result[:]))
# print square_sum Value
print("Sum of squares(in process p1): {}".format(square_sum.value))
if __name__ == "__main__":
# input list
mylist = [1, 2, 3, 4]
# creating Array of int data type with space for 4 integers
result = multiprocessing.Array("i", 4)
# creating Value of int data type
square_sum = multiprocessing.Value("i")
# creating new process
p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
# starting process
p1.start()
# wait until the process is finished
p1.join()
# print result array
print("Result(in main program): {}".format(result[:]))
# print square_sum Value
print("Sum of squares(in main program): {}".format(square_sum.value))
result = multiprocessing.Array('i', 4)
创建result
变量,它可以在主进程和p1
进程中共享。在p1
中的运算更新了result
,意味着在主进程中也被更新了。i
代表 interger,即整形。其他的,例如d
代表浮点型。4
是这个数组的长度。square_sum = multiprocessing.Value('i')
创建square_sum
变量。- 同样地,
i
代表 interger。 - 也可以指定初始值(例如 10),用
square_sum = multiprocessing.Value('i', 10)
。
Manager
创建可以共享的变量,支持更多数据结构¶
Array
和Value
能创建的数据类型有限。如果想对更多的数据结构进行共享,可以用Manager
来创建。
示例代码:
import multiprocessing
def print_records(records):
"""
function to print record(tuples) in records(list)
"""
for record in records:
print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))
def insert_record(record, records):
"""
function to add a new record to records(list)
"""
records.append(record)
print("New record added!\n")
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
# creating a list in server process memory
records = manager.list([("Sam", 10), ("Adam", 9), ("Kevin", 9)])
# new record to be inserted in records
new_record = ("Jeff", 8)
# creating new processes
p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
p2 = multiprocessing.Process(target=print_records, args=(records,))
# running process p1 to insert new record
p1.start()
p1.join()
# running process p2 to print records
p2.start()
p2.join()
with multiprocessing.Manager() as manager:
-
records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])
创建了records
变量,它是一个 List,并且可以在进程之间共享。 -
类似地,可以用
manager.dict()
创建可以共享的字典。
Queue
创建共享变量¶
示例代码:
import multiprocessing
def square_list(mylist, q):
"""
function to square a given list
"""
# append squares of mylist to queue
for num in mylist:
q.put(num * num)
def print_queue(q):
"""
function to print queue elements
"""
print("Queue elements:")
while not q.empty():
print(q.get())
print("Queue is now empty!")
if __name__ == "__main__":
# input list
mylist = [1, 2, 3, 4]
# creating multiprocessing Queue
q = multiprocessing.Queue()
# creating new processes
p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
p2 = multiprocessing.Process(target=print_queue, args=(q,))
# running process p1 to square list
p1.start()
p1.join()
# running process p2 to get queue elements
p2.start()
p2.join()
q = multiprocessing.Queue()
创建q
变量。- 首先在
p1
进程中,将mylist
中的元素逐一平方并放到q
中。 - 然后在
p2
进程中,将q
中的元素注意取出并打印。
Pipes
在两个进程之间传输数据¶
示例代码:
import multiprocessing
def sender(conn, msgs):
"""
function to send messages to other end of pipe
"""
for msg in msgs:
conn.send(msg)
print("Sent the message: {}".format(msg))
conn.close()
def receiver(conn):
"""
function to print the messages received from other
end of pipe
"""
while 1:
msg = conn.recv()
if msg == "END":
break
print("Received the message: {}".format(msg))
if __name__ == "__main__":
# messages to be sent
msgs = ["hello", "hey", "hru?", "END"]
# creating a pipe
parent_conn, child_conn = multiprocessing.Pipe()
# creating new processes
p1 = multiprocessing.Process(target=sender, args=(parent_conn, msgs))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
# running processes
p1.start()
p2.start()
# wait until processes finish
p1.join()
p2.join()
parent_conn, child_conn = multiprocessing.Pipe()
创建了一个Pipe
。conn.send(msg)
将msg
传到pipe
的另一端。-
msg = conn.recv()
从pipe
接收信息,并将信息存入msg
。 -
注意,这里的
p1
和p2
是同时进行的,因此sender
和receiver
打印出来的信息可能会发生错乱(例如没有换行),这是正常的。但发送和接受信息的顺序不会乱,例如,hello
比hey
先被发送,那么前者也比后者先被接收。
防阻塞:Lock
锁定资源¶
共享资源可能会造成进程之间的竞争(Race condition)。
示例代码:
# Python program to illustrate
# the concept of race condition
# in multiprocessing
import multiprocessing
# function to withdraw from account
def withdraw(balance):
for _ in range(10000):
balance.value = balance.value - 1
# function to deposit to account
def deposit(balance):
for _ in range(10000):
balance.value = balance.value + 1
def perform_transactions():
# initial balance (in shared memory)
balance = multiprocessing.Value("i", 100)
# creating new processes
p1 = multiprocessing.Process(target=withdraw, args=(balance,))
p2 = multiprocessing.Process(target=deposit, args=(balance,))
# starting processes
p1.start()
p2.start()
# wait until processes are finished
p1.join()
p2.join()
# print final balance
print("Final balance = {}".format(balance.value))
if __name__ == "__main__":
for _ in range(10):
# perform same transaction process 10 times
perform_transactions()
两个同时运行的进程,在访问资源时没有先后顺序,两者都是基于自己的资源进行计算,没有考虑到对方的计算结果,因而计算结果不符合预期。
加上lock
之后的代码:
# Python program to illustrate
# the concept of locks
# in multiprocessing
import multiprocessing
# function to withdraw from account
def withdraw(balance, lock):
for _ in range(10000):
lock.acquire()
balance.value = balance.value - 1
lock.release()
# function to deposit to account
def deposit(balance, lock):
for _ in range(10000):
lock.acquire()
balance.value = balance.value + 1
lock.release()
def perform_transactions():
# initial balance (in shared memory)
balance = multiprocessing.Value("i", 100)
# creating a lock object
lock = multiprocessing.Lock()
# creating new processes
p1 = multiprocessing.Process(target=withdraw, args=(balance, lock))
p2 = multiprocessing.Process(target=deposit, args=(balance, lock))
# starting processes
p1.start()
p2.start()
# wait until processes are finished
p1.join()
p2.join()
# print final balance
print("Final balance = {}".format(balance.value))
if __name__ == "__main__":
for _ in range(10):
# perform same transaction process 10 times
perform_transactions()
lock = multiprocessing.Lock()
可以创建一个Lock
对象。- 变量
lock
作为参数传入两个进程的函数中。 - 在函数执行前后加上
lock.acquire()
和lock.release()
,可以暂时锁定资源,不被其他进程影响。
进程池Pooling
¶
在第一节中,我们通过新建Process
对象来创建进程。如果有很多个进程,我们岂不是要写很多行创建Process
的语句?
Pool
对象可以方便地一次创建很多进程,将这些进程放到一个“池子”里,如果有一个进程运行结束了,那么空闲下的 CPU 就可以继续完成剩余的任务,这样就可以最大化地利用计算性能。
# Python program to understand
# the concept of pool
import multiprocessing
import os
def square(n):
print("Worker process id for {0}: {1}".format(n, os.getpid()))
return n * n
if __name__ == "__main__":
# input list
mylist = [1, 2, 3, 4, 5]
# creating a pool object
p = multiprocessing.Pool()
# map list to target function
result = p.map(square, mylist)
print(result)
p = multiprocessing.Pool()
创建了一个Pool
对象。括号中还可以指定最多使用多少个 CPU。result = p.map(square, mylist)
将任务(也就是square
函数)分配给进程池p
,传入给任务函数的参数列表放在mylist
中。- 每个进程完成任务后所返回的结果会汇总到
result
列表中。