明白了。IMO,这里发生的情况是订阅者无法按照消息发布的速度使用消息。结果,输出缓冲区被填满,最终,当达到限制时连接被关闭。
以下是该现象的最小再现(不包括 golang 和 EdgeX):
import redis
def s_handler(msg):
print(len(msg['data']))
s = redis.Redis()
sp = s.pubsub()
sp.subscribe(**{'channel': s_handler})
t = sp.run_in_thread(sleep_time=0.001)
p = redis.Redis()
while True:
p.publish('channel', '0' * 30 * pow(10,6))
经过几次迭代后,显然以这个日志行结束:
44407:M 14 Dec 2022 17:52:11.785 # Client id=25 addr=[::1]:62956 laddr=[::1]:6379 fd=10 name= age=2 idle=0 flags=P db=0 sub=1 psub=0 ssub=0 multi=-1 qbuf=0 qbuf-free=16890 argv-mem=0 multi-mem=0 rbs=16384 rbp=16384 obl=0 oll=37 omem=1080006008 tot-mem=1080040112 events=rw cmd=subscribe user=default redir=-1 resp=2 scheduled to be closed ASAP for overcoming of output buffer limits.
我不相信您的用例(实时视频流)和所选的实现非常匹配。AFAIK,Redis 的 Pub/Sub 通常用于广播明显较小的有效负载,而这感觉像是滥用该功能:)
增加输出缓冲区没有帮助。您需要订阅者跟上发布者的步伐,因此如果可能,请尝试扩展/优化它。但请注意,即使单个订阅者可以足够快地消费,拥有多个订阅者最终也会使服务器的网络接口饱和(假设有足够的 RAM 用于许多较大的输出缓冲区)。
很抱歉我无法提供更多帮助。我正在将此问题转移到核心存储库,以防有人有我缺少的答案。