Concurrent Execution
进程:每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据,进程间不共享全局变量。
线程:线程(有时被称为轻量级进程)跟进程有些相似,不同的是,所有的线程运行在同一个进程中,共享相同的运行环境,同一个进程的线程之间共享全局变量。
IPC: 进程/线程之间交换信息叫进程间通信.
python的多线程由于GIL只有并发没有并行,无论有多少cpu,一次只能有一个python解释器(线程)执行.一次只能执行一个线程.一次只能用到一个逻辑cpu.
IO密集型任务消耗IO,但是不消耗CPU,cpu切换消耗少,适合用多线程.
python的多进程可以并行,每个进程启动一个解释器进程.
多进程开销大,消耗内存.
计算密集型消耗cpu,任务个数不超过cpu个数.适合用多进程,把每个cpu跑满.
multiprocessing
多进程就是同时执行多个任务.
python可以通过多进程取代多线程,从而绕过多线程的GIL.
python是静态语言,
import multiprocessing
classes:
# multiprocessing.Process
proc = Process(group=None, target=None, name=None, args=(), kwargs={})
# methods:
run(self)
start(self) # 启动一个进程
join(self, timeout=None) # 父进程等待子进程结束
is_alive()
terminate(self)
# data descriptor:
authkey
daemon # proc.daemon = True 后台运行
exitcode
ident
name
pid
functions:
# 普通函数
active_children()
allow_connection_pickling()
cpu_count() # 获取cpu个数
current_process()
freeze_support()
get_logger()
log_to_stderr(level=None)
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
pool = Pool()
# func只能是顶层函数,不能是方法和内部函数.
# 进程池,可以控制进程数量,processes 默认是cpu个数(cpu_count())
# 非阻塞,维持进程总数,当一个进程结束会添加新的进程到pool,主进程不阻塞,同步运行,pool中的进程并发执行.
apply_async(func, args=(), kwargs={}, callback=None) # 非阻塞,
map_async(func, iterable, chunksize=None, callback=None) # 非阻塞
# 阻塞,维持进程总数,当一个进程结束会添加新的进程到pool,主进程阻塞,pool中的进程一个一个执行.
apply(func, args=(), kwargs={}) # 阻塞
map(func, itreable, chunksize=None) # 阻塞
terminate() # 终止所有任务
close() # 关闭pool,不接受新任务
join() # 等待pool中子进程结束,要在close/terminate之后调用.
# IPC: 管道
Pipe(duplex=True) # duplex=True表示默认是双向pipe.
receiver, sender = Pipe()
sender.send(obj)
receiver.recv()
close()
# IPC: 消息队列
# 来自于Queue.Queue, 具体方法参考Queue.Queue
Queue(maxsize=0) # return a queue object
q = Queue()
# IPC: 共享内存
Manager()
list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array
Array(typecode_or_type, size_or_initializer, **kwds)
RawArray(typecode_or_type, size_or_initializer)
Value(typecode_or_type, *args, **kwds)
RawValue(typecode_or_type, *args)
Event()
# 同步: 条件变量
Condition(lock=None)
# 同步:信号量
Semaphore(value=1)
# 同步:有界信号量
BoundedSemaphore(value=1)
# 同步: 锁
Lock()
# 同步: 锁
RLock()
data:
SUBDEBUG = 5
SUBWARNING = 25
threading
多线程就是把单个任务分成不同部分运行.
threading支持守护线程(通过join方法实现).
import threading
classes:
# threading.Thread
t = Thread(group=None, target=None, name=None, args=(), kwargs=None, verbose=None)
threads.append(t)
# methods:
run(self) # 子类重写用来定义线程的功能的函数, 通常通过这种方式来创建线程
start(self) # 开始执行线程
join(self, timeout=None) # 主程序挂起,直到线程结束,再继续运行主程序
is_alive(self) # 表示线程是否还在运行的boolean
getName(self) # 返回线程名字
setName(self, name) # 设置线程名字
isDaemon(self) # 返回线程的daemon标志
setDaemon(self, daemonic) # daemonic=True 使线程在后台运行
functions:
active_count() # 当前活动的线程对象的数量
current_thread() # 返回当前线程对象
enumerate() # 返回当前活动线程列表
settrace(func) # 为所有的线程设置一个跟踪函数
setprofile(func) # 为所有线程设置一个profile函数
stack_size()
Timer(*args, **kwargs)
t = Timer(30.0, f, args=[], kwargs={})
t.start() # 在一个子线程等待,timeout就执行f(*args, **kwaargs).
t.cancel() # 如果还在等待就取消.
Event(*args, **kwargs)
# 同步: 条件变量
Condition(*args, **kwargs)
# 同步: 信号量
Semaphore(value=1, *args, **kwargs)
# 信号量,默认value=1, 内部计数器不能小于0,当计数器==0时,调用acquire会阻塞.
# 同步: 有界信号量
BoundedSemaphore(value=1, *args, **kwargs)
# 有界信号量,默认value=1,内部计数器不能小于0,并且不能大于value。
# 当计数器==0,调用acquire会阻塞,当>value抛出VAlueError异常
# 可用来控制并发运行的线程数量
bs = BoundSemaphore(number) # bs是全局的.
def thread_function(*args, **kwargs):
...
bs.release() # 使计数器+1
for t in threads:
bs.acquire() # 使计数器-1
thread.start()
for t in threads:
t.join()
# 同步: 锁
Lock()
# 使同一变量在多个线程间同步
lock = Lock()
variable = value
def thread_function(*args, **kwargs):
global lock
global variable
lock.acquire() # 加锁,使线程进入同步阻塞状态
variable = new_value
lock.release() # 释放锁
# 同步: 锁
RLock(*args, **kwargs)
subprocess
开启一个子进程来执行外部命令.
import subprocess
classes:
Popen(args,
bufsize=0, executable=None, stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=False, shell=False,cwd=None,
env=None, universal_newlines=False,startupinfo=None, creationflags=0)
# p = Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 如果命令和参数是字符串形式,需要参数shell=True
# p = Popen(command_string, shell=True, ...)
# 非交互执行sudo命令, 或者使用sh/pexpect等第三方库
Popen(['sudo', '-S'] + shlex.split(command), stdin=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ...)
stdout, stderr = child.communicate(password+'\n')
# methods:
poll() # 检查子进程是否结束,返回returncode.
wait() # 等待子进程结束,返回returncode.
communicate(input=None) # 返回(stdout, stderr).
kill() # 发送SIGKILL信号
pipe_cloexec()
send_signal(sig)
terminate()
# Data:
stdin
stdout
stderr
pid
returncode
functions:
call(*popenargs, **kwargs)
# retcode = call(["ls", "-l"])
check_call(*popenargs, **kwargs)
# check_call(["ls", "-l"]) -> 如果返回码为0就返回,否则抛出CalledProcessError.
check_output(*popenargs, **kwargs) # 返回一个字符串
# output = check_output(["ls", "-l", "/dev/null"]) -> 如果返回码为0返回命令结果,否则抛出CalledProcessError.
data:
PIPE = -1
STDOUT = -2
sched
queue
dummy_threading
Interprocess Communication and Networking
socket
socket协议的标准库
import socket
classes:
# socket = class _socketobject(__builtin__.object)
# socket.socket([family[, type[, proto]]])
close() # 关闭socket
shutdown(flag) # 0 关闭读,1关闭写,2全部关闭
# eg: socket.socket(AF_INET, SOCK_STREAM, 0) ipv4+tcp
# eg: socket.socket(AF_INET, SOCK_DGRAM, 0) ipv4+udp
# methods:
bind(address) # 服务器绑定(host, port)到socket
listen(backlog) # 服务器开始监听tcp
accept() # 服务器阻塞等待客户的tcp连接, 返回(socket object, address info)
connect(address) # 客户端主动初始化tcp连接,连接失败抛出异常
connect_ex(address) # 同上,连接失败返回errno
send(data[, flags]) # 发送tcp数据
sendall(data[, flags]) # 发送完整tcp数据
recv(buflen[, flags]) # 接收tcp数据
recv_into(buffer[, nbytes[, flags]])
sendto(data[, flags], addr) # 发送udp数据
recvfrom(buflen[, flags]) # 接收udp数据
recvfrom_into(buffer[, nbytes, [, flags])])
getpeername() # 获取当前socket的远端地址
getsockname() # 获取当前socket的地址
getsockopt(level, option[, buffersize]) # 获取socket参数
setsockopt(level, option, value) # 设置socket参数
setblocking(flag)
gettimeout()
settimeout(timeout)
makefile([mode[, bufsize]])
fileno()
dup()
# data descriptor:
family/type/proto 参考man 2 socket
recv
recv_into
recvfrom
recvfrom_into
send
sendto
class SocketIO:
class SocketIO(io.RawIOBase)
sio = SocketIO(sock, mode)
methods:
close()
readinto(b)
write(b)
functions:
create_connection(address, timeout=<object object>, source_address=None)
fromfd(fd, family, type[, proto]) # 用一个已经打开的文件描述符创建一个socket对象
getaddrinfo(host, port [, family, socktype, proto, flags])
getdefaulttimeout()
getfqdn(name='') # FQDN, 获取完整域的信息
gethostbyaddr(host) -> (name, aliaslist, addresslist)
gethostbyname(host)
gethostbyname_ex(host) -> (name, aliaslist, addresslist)
gethostname()
getnameinfo(sockaddr, flags) --> (host, port)
getprotobyname(name)
getservbyname(servicename[, protocolname])
getservbyport(port[, protocolname])
htonl(integer)
htons(integer)
inet_aton(string)
inet_ntoa(packed_ip)
inet_ntop(af, packed_ip)
inet_pton(af, ip)
ntohl(integer)
ntohs(integer)
setdefaulttimeout(timeout)
socketpair([family[, type[, proto]]]) -> (socket object, socket object)