Concurrent Execution
进程:每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据,进程间不共享全局变量。
线程:线程(有时被称为轻量级进程)跟进程有些相似,不同的是,所有的线程运行在同一个进程中,共享相同的运行环境,同一个进程的线程之间共享全局变量。
IPC: 进程/线程之间交换信息叫进程间通信.
python的多线程由于GIL只有并发没有并行,无论有多少cpu,一次只能有一个python解释器(线程)执行.一次只能执行一个线程.一次只能用到一个逻辑cpu.
IO密集型任务消耗IO,但是不消耗CPU,cpu切换消耗少,适合用多线程.(线程等待IO会释放GIL)。
python的多进程可以并行,每个进程启动一个解释器进程.
多进程开销大,消耗内存.
计算密集型消耗cpu,任务个数不超过cpu个数.适合用多进程,把每个cpu跑满. 如果用多线程是无效的,会串行执行,浪费多核性能。
io密集: 用异步asyncio(协程)或者多线程, 网络请求,文件读写,数据库查询。 计算密集: 用多进程
绕过GIL的方式有:
- 使用C/C++编写扩展模块(numpy, pandas等)
- 使用Cython编译器
- 使用多进程 (multiprocessing)
multiprocessing
多进程就是同时执行多个任务.
python可以通过多进程取代多线程,从而绕过多线程的GIL.
python是静态语言,
import multiprocessing
classes:
# multiprocessing.Process
Process(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
# methods:
run(self) # 子类重写用来定义进程的功能的函数, 通常通过这种方式来创建进程
start(self) # 启动一个进程
join(self, timeout=None) # 父进程阻塞等待子进程结束,后面的代码不会执行,直到子进程结束。
is_alive() # 返回进程是否还在运行的boolean
exit(self) # 退出进程
terminate(self) # 终止进程
exitcode() # 返回进程的退出码
name() # 返回进程的名字
pid() # 返回进程的pid
functions:
active_children()
allow_connection_pickling()
cpu_count() # 获取cpu个数
current_process()
freeze_support()
get_logger()
log_to_stderr(level=None)
Pool:
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None) # 返回一个进程池对象, processes默认是cpu个数.
with Pool(processes=4) as pool:
pool.map(func, iterable) # 阻塞,返回所有结果
# func只能是顶层函数,不能是方法和内部函数.
# 进程池,可以控制进程数量,processes 默认是cpu个数(cpu_count())
# 非阻塞,维持进程总数,当一个进程结束会添加新的进程到pool,主进程不阻塞,同步运行,pool中的进程并发执行.
apply_async(func, args=(), kwargs={}, callback=None) # 非阻塞,
map_async(func, iterable, chunksize=None, callback=None) # 非阻塞
# 阻塞,维持进程总数,当一个进程结束会添加新的进程到pool,主进程阻塞,pool中的进程一个一个执行.
apply(func, args=(), kwargs={}) # 阻塞,执行一次函数
map(func, itreable, chunksize=None) # 阻塞, 返回所有结果
terminate() # 终止所有任务
close() # 关闭pool,不接受新任务
join() # 等待pool中子进程结束,要在close/terminate之后调用.
IPC: 管道
Pipe(duplex=True) # duplex=True表示默认是双向pipe.
receiver, sender = Pipe()
sender.send(obj)
receiver.recv()
close()
IPC: 消息队列
# 用于进程间通信,线程和进程安全。
Queue(maxsize=0) # return a queue object
q = Queue()
q.put(obj) # 阻塞,放入一个对象到队列中
q.put_nowait(obj) # 非阻塞,放入一个对象到队列中
q.get() # 阻塞,从队列中获取一个对象
q.get_nowait() # 非阻塞,从队列中获取一个对象
q.empty() # 返回队列是否为空的boolean
q.full() # 返回队列是否已满的boolean
q.qsize() # 返回队列的大小
IPC: 共享内存
Manager()
Event()
# 同步: 条件变量
Condition(lock=None)
# 同步:信号量
Semaphore(value=1)
# 同步:有界信号量
BoundedSemaphore(value=1)
# 同步: 普通锁,一个线程只能acquire一次,否则死锁。
Lock(*args, **kwargs) # 返回一个Lock对象, 用于线程间同步.
with Lock() as lock:
......
# 同步: 重入锁,允许同一线程多次获取锁和释放锁。
RLock(*args, **kwargs) # 返回一个RLock对象, 可重入锁, 允许同一线程多次获取锁.
threading
多线程就是把单个任务分成不同部分运行.
threading支持守护线程(通过join方法实现).
import threading
classes:
# threading.Thread
Thread(group=None, target=None, name=None, args=(), kwargs=None, daemon=None)
# methods:
append(self, *args, **kwargs) # 添加线程到线程组
run(self) # 子类重写用来定义线程的功能的函数, 通常通过这种方式来创建线程
start(self) # 开始执行线程
join(self, timeout=None) # 主程序挂起,阻塞等待子线程结束,再继续运行主程序
getName(self) # 返回线程名字
setName(self, name) # 设置线程名字
is_alive(self) # 表示线程是否还在运行的boolean
isDaemon(self) # 返回线程的daemon标志
setDaemon(self, daemonic) # daemonic=True 使线程在后台运行
functions:
active_count() # 当前活动的线程对象的数量
current_thread() # 返回当前线程对象
enumerate() # 返回当前活动线程列表
settrace(func) # 为所有的线程设置一个跟踪函数
setprofile(func) # 为所有线程设置一个profile函数
stack_size()
计时器Timer:
Timer(*args, **kwargs)
t = Timer(30.0, f, args=[], kwargs={})
t.start() # 在一个子线程等待,timeout就执行f(*args, **kwaargs).
t.cancel() # 如果还在等待就取消.
同步:事件
Event(*args, **kwargs)
同步: 条件变量
Condition(*args, **kwargs)
同步: 信号量
Semaphore(value=1, *args, **kwargs)
# 信号量,默认value=1, 内部计数器不能小于0,当计数器==0时,调用acquire会阻塞.
同步: 有界信号量
BoundedSemaphore(value=1, *args, **kwargs)
# 有界信号量,默认value=1,内部计数器不能小于0,并且不能大于value。
# 当计数器==0,调用acquire会阻塞,当>value抛出VAlueError异常
# 可用来控制并发运行的线程数量
bs = BoundSemaphore(number) # bs是全局的.
def executor(*args, **kwargs):
...
bs.release() # 使计数器+1
def consumer(*args, **kwargs):
...
bs.acquire() # 使计数器-1
同步: 锁
Lock()
# 使同一变量在多个线程间同步
lock = Lock()
variable = value
def thread_function(*args, **kwargs):
global lock
global variable
lock.acquire() # 加锁,使线程进入同步阻塞状态
variable = new_value
lock.release() # 释放锁
同步: 锁
RLock(*args, **kwargs)
queue
queue用于线程间通信,是线程安全的。
import queue
classes:
# queue.Queue
Queue(maxsize=0) # 创建一个队列对象,maxsize=0表示无限制。
# methods:
put(item, block=True, timeout=None) # 向队列中添加一个item,阻塞直到队列有空间。
get(block=True, timeout=None) # 从队列中获取一个item,阻塞直到队列有数据。
qsize() # 返回队列的大小
empty() # 返回队列是否为空的boolean
full() # 返回队列是否满的boolean
task_done() # 标记一个任务已完成,必须在get之后调用。
join() # 阻塞直到队列中的所有任务都完成,必须在put之后调用。
subprocess
开启一个子进程来执行外部命令.
import subprocess
classes:
Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
# 创建一个子进程,args可以是字符串或者列表形式.
# args: 命令和参数列表, 可以是字符串或者列表形式.
# 如果是字符串形式,shell=True, 否则是列表形式.
# 如果是列表形式,shell=False, args[0]是可执行文件的路径,args[1:]是参数列表.
# 非交互执行sudo命令, 或者使用sh/pexpect等第三方库
Popen(['sudo', '-S'] + shlex.split(command), stdin=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ...)
stdout, stderr = child.communicate(password+'\n')
# methods:
poll() # 检查子进程是否结束,返回returncode.
wait() # 等待子进程结束,返回returncode.
communicate(input=None) # 返回(stdout, stderr).
kill() # 发送SIGKILL信号
pipe_cloexec()
send_signal(sig)
terminate()
# Data:
stdin
stdout
stderr
pid
returncode
functions:
call(*popenargs, **kwargs)
# retcode = call(["ls", "-l"])
check_call(*popenargs, **kwargs)
# check_call(["ls", "-l"]) -> 如果返回码为0就返回,否则抛出CalledProcessError.
check_output(*popenargs, **kwargs) # 返回一个字符串
# output = check_output(["ls", "-l", "/dev/null"]) -> 如果返回码为0返回命令结果,否则抛出CalledProcessError.
data:
PIPE = -1
STDOUT = -2
concurrent
concurrent模块提供了多线程和多进程的高级接口.
import concurrent.futures
sched
contextvars
Inter Process Communication and Networking
asyncio
asyncio是基于event loop(事件循环)的异步编程框架,并发运行协程、执行网络IO和IPC,控制子进程,通过队列实现分布式任务,同步并发代码。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
coroutine(协程)是一个特殊的函数,可以暂停和恢复执行,允许在等待IO操作时不阻塞主线程。在单线程内运行,通过事件循环调度,非抢占式,内存消耗小,切换快,不受GIL限制。
coroutine function: “async def” 定义协程函数,返回协程对象。 coroutine object: 表示一个没有执行的协程,需要变成一个task,并且进入event loop才能执行。
import asyncio
await后如果是一个coroutine:
- 隐士创建task(不会调用create_task),只运行当前协程。
- 顺序执行
await后如果是一个task:
- 直接注册到event loop。
- 并发执行
协程对象,或者awaitable对象必须调用await。
classes:
# asyncio.Runner
runner = Runner(loop=None, debug=False) # 创建一个Runner对象,用于运行协程。
with asyncio.Runner() as runner:
runner.run(coro) # 运行一个协程,返回协程的结果。
# asyncio.Task
Task(coro, *, loop=None) # 将coroutine变成一个task,并且注册到event loop,返回Task对象。
# methods:
cancel() # 取消任务,抛出CancelledError异常。
done() # 返回任务是否完成的boolean。
result() # 返回任务的结果,如果任务未完成则抛出异常。
exception() # 返回任务的异常,如果任务未完成则抛出异常。
add_done_callback(callback) # 添加一个回调函数,当任务完成时调用。
# asyncio.TaskGroup, 批量创建task。
task_group = TaskGroup(*, loop=None)
async with TaskGroup() as tg:
tg1 = tg.create_task(coro1)
tg2 = tg.create_task(coro2)
functions:
run(coro, *, debug=False) # 是运行协程的唯一入口函数,调用顶层协程函数,返回协程的结果。
asyncio.run(main())
create_task(coro, *, loop=None) # 将coroutine变成一个task,并且注册到event loop,返回Task对象
tasks = [
asyncio.create_task(coro(...)),
for i in range(10)
]
gather(*aws, return_exceptions=False) # 并发执行aws中的可等待对象。返回一个列表,顺序与aws中一致。gather会自动把coroutine变成task。无需手动调用create_task。
# return_exceptions=False时,如果有异常发生,抛出异常。默认值。
# return_exceptions=True时,如果有异常发生,返回异常对象而不是抛出异常。
await asyncio.gather(*tasks)
sleep(delay, result=None, *, loop=None) # 异步睡眠,返回一个Future对象
await asyncio.sleep(1) # 等待1秒钟
timeout(delay, *, loop=None) # 返回一个Future对象,延迟指定时间后完成
async with asyncio.timeout(10):
await my_task()
wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED) # 等待一组Future对象完成,返回一个包含已完成和未完成Future对象的元组。
done, pending = await asyncio.wait(fs, timeout=10, return_when=asyncio.ALL_COMPLETED)
wait_for(aws, timeout=None, *, loop=None) # 等待一个可等待对象完成,返回结果。超时后抛出asyncio.TimeoutError异常。
await asyncio.wait_for(coro(), timeout=10) # 等待coro在10秒内完成。
shield(fut, *, loop=None) # 保护一个Future对象,使其不会被取消,返回一个Future对象。
as_completed(fs, *, loop=None) # 返回一个迭代器,迭代器会按完成的顺序返回Future对象。
async for fut in as_completed(fs):
result = await fut # 等待每个Future对象完成,按完成顺序返回结果。
current_task(loop=None) # 返回当前正在运行的Task对象,如果没有正在运行的Task,则返回None。
all_tasks(loop=None) # 返回当前事件循环中的所有Task对象。
iscoroutine(obj) # 判断obj是否是一个协程对象
iscoroutinefunction(obj) # 判断obj是否是一个协程函数。
isawaitable(obj) # 判断obj是否是一个可等待对象。
isfuture(obj) # 判断obj是否是一个Future对象。
Queue用于协程间通信, 不是线程安全的:
# asyncio.Queue
queue = asyncio.Queue(maxsize=0, *, loop=None) # 创建一个异步队列,maxsize=0表示无限制。
await queue.put(item) # 向队列中添加一个item,阻塞直到队列有空间。
item = await queue.get() # 从队列中获取一个item,阻塞直到队列有数据。
item = queue.get_nowait() # 非阻塞获取队列中的一个item,如果队列为空则抛出asyncio.QueueEmpty异常。
queue.task_done() # 标记一个任务已完成,通常在get之后调用。
await queue.join() # 阻塞直到所有任务都完成。
同步原语相关:
Lock
Event
Condition
Semaphore
BoundedSemaphore
Barrier
socket
socket协议的标准库
import socket
classes:
# socket = class _socketobject(__builtin__.object)
# socket.socket([family[, type[, proto]]])
close() # 关闭socket
shutdown(flag) # 0 关闭读,1关闭写,2全部关闭
# eg: socket.socket(AF_INET, SOCK_STREAM, 0) ipv4+tcp
# eg: socket.socket(AF_INET, SOCK_DGRAM, 0) ipv4+udp
# methods:
bind(address) # 服务器绑定(host, port)到socket
listen(backlog) # 服务器开始监听tcp
accept() # 服务器阻塞等待客户的tcp连接, 返回(socket object, address info)
connect(address) # 客户端主动初始化tcp连接,连接失败抛出异常
connect_ex(address) # 同上,连接失败返回errno
send(data[, flags]) # 发送tcp数据
sendall(data[, flags]) # 发送完整tcp数据
recv(buflen[, flags]) # 接收tcp数据
recv_into(buffer[, nbytes[, flags]])
sendto(data[, flags], addr) # 发送udp数据
recvfrom(buflen[, flags]) # 接收udp数据
recvfrom_into(buffer[, nbytes, [, flags])])
getpeername() # 获取当前socket的远端地址
getsockname() # 获取当前socket的地址
getsockopt(level, option[, buffersize]) # 获取socket参数
setsockopt(level, option, value) # 设置socket参数
setblocking(flag)
gettimeout()
settimeout(timeout)
makefile([mode[, bufsize]])
fileno()
dup()
# data descriptor:
family/type/proto 参考man 2 socket
recv
recv_into
recvfrom
recvfrom_into
send
sendto
class SocketIO:
class SocketIO(io.RawIOBase)
sio = SocketIO(sock, mode)
methods:
close()
readinto(b)
write(b)
functions:
create_connection(address, timeout=<object object>, source_address=None)
fromfd(fd, family, type[, proto]) # 用一个已经打开的文件描述符创建一个socket对象
getaddrinfo(host, port [, family, socktype, proto, flags])
getdefaulttimeout()
getfqdn(name='') # FQDN, 获取完整域的信息
gethostbyaddr(host) -> (name, aliaslist, addresslist)
gethostbyname(host)
gethostbyname_ex(host) -> (name, aliaslist, addresslist)
gethostname()
getnameinfo(sockaddr, flags) --> (host, port)
getprotobyname(name)
getservbyname(servicename[, protocolname])
getservbyport(port[, protocolname])
htonl(integer)
htons(integer)
inet_aton(string)
inet_ntoa(packed_ip)
inet_ntop(af, packed_ip)
inet_pton(af, ip)
ntohl(integer)
ntohs(integer)
setdefaulttimeout(timeout)
socketpair([family[, type[, proto]]]) -> (socket object, socket object)
select
等待IO事件发生,返回就绪的文件描述符列表。
import select
selectors
selectors模块提供了更高级的IO多路复用接口。
import selectors