Multiprocessing Pipe和Queue性能测试

我的微信公众号:pyquant

背景

开发股票行情推送的引擎时遇到一个问题,在9:30开盘后的一段时间内行情消息总是堆积,尤其是开头15-20分钟,堆积的数据量会越来越多,经过debug发现是内部消息传输使用Queue性能问题导致了消息延迟,在stackoverflow上找到一个帖子对Queue的性能进行了测试和解释说明,下面先来介绍下Multiprocessing下的Queue和Pipe

介绍

当使用多个进程时,通常使用消息传递来进行进程之间的通信,为了不损耗性能也会尽量避免使用同步机制。对于消息传递:

* Pipe适用于两个进程间的消息传递。
* Queue适用于多个进程间的消息传递,适用于多生产者和消费者的模式。

Pipe VS Queue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()  # We are only reading
    while True:
        msg = p_output.recv()  # Read from the output pipe and do nothing
        if msg == 'DONE':
            break

def writer(count, p_input):
    for ii in range(0, count):
        p_input.send(ii)  # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__ == '__main__':
    for count in [10 ** 4, 10 ** 5, 10 ** 6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()  # Launch the reader process

        p_output.close()  # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input)  # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
                                                                      (time.time() - _start)))

Pipe输出结果

1
2
3
Sending 10000 numbers to Pipe() took 0.0744009017944336 seconds
Sending 100000 numbers to Pipe() took 0.7794349193572998 seconds
Sending 1000000 numbers to Pipe() took 7.425454139709473 seconds
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)  # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__ == '__main__':
    pqueue = Queue()  # writer() writes to pqueue from _this_ process
    for count in [10 ** 4, 10 ** 5, 10 ** 6]:
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)  # Send a lot of stuff to reader()
        reader_p.join()  # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count,
                                                                       (time.time() - _start)))

Queue 输出结果

1
2
3
Sending 10000 numbers to Queue() took 0.2558887004852295 seconds
Sending 100000 numbers to Queue() took 2.4320709705352783 seconds
Sending 1000000 numbers to Queue() took 23.602338075637817 seconds

让我们把结果整理成表格方便对比查看:

循环次数PipeQueue
100000.07440.2558
1000000.77942.4320
10000007.425423.6023

通过对比测试可以发现,Pipe性能大约为Queue的3倍,所以在仅有两端通信的情况下应该优先使用Pipe。

源码分析

通过阅读Queue的源码,我们可以发现,其实在Queue内部是用Lock来实现对Pipe的安全读写操作的。所以相比于Pipe会有额外的锁的开销。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class Queue(object):

    def __init__(self, maxsize=0, *, ctx):
        if maxsize <= 0:
            # Can raise ImportError (see issues #3770 and #23400)
            from .synchronize import SEM_VALUE_MAX as maxsize
        self._maxsize = maxsize
        self._reader, self._writer = connection.Pipe(duplex=False) # 这里初始化了Pipe对象
        self._rlock = ctx.Lock()
        self._opid = os.getpid()
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = ctx.Lock()
        self._sem = ctx.BoundedSemaphore(maxsize)
        # For use by concurrent.futures
        self._ignore_epipe = False
        self._after_fork()
        if sys.platform != 'win32':
            register_after_fork(self, Queue._after_fork)

    def put(self, obj, block=True, timeout=None):
        if self._closed:
            raise ValueError(f"Queue {self!r} is closed")
        if not self._sem.acquire(block, timeout):
            raise Full

        with self._notempty:
            if self._thread is None:
                self._start_thread()
            self._buffer.append(obj)
            self._notempty.notify()

    def get(self, block=True, timeout=None):
        if self._closed:
            raise ValueError(f"Queue {self!r} is closed")
        if block and timeout is None:
            with self._rlock:
                res = self._recv_bytes()
            self._sem.release()
        else:
            if block:
                deadline = time.monotonic() + timeout
            if not self._rlock.acquire(block, timeout):
                raise Empty
            try:
                if block:
                    timeout = deadline - time.monotonic()
                    if not self._poll(timeout):
                        raise Empty
                elif not self._poll():
                    raise Empty
                res = self._recv_bytes()
                self._sem.release()
            finally:
                self._rlock.release()
        # unserialize the data after having released the lock
        return _ForkingPickler.loads(res)

参考

https://stackoverflow.com/questions/8463008/multiprocessing-pipe-vs-queue

技术博客 - 记录学习与思考
使用 Hugo 构建
主题 StackJimmy 设计