首页>Program>source

我在同一网络中有3台服务器.在每个服务器上,redis服务和某种生产者正在运行.生产者将作业排队到名为 tasks的本地rq队列中 . 所以每个服务器都有自己的 tasks 队列。

此外,还有另外一台服务器正在运行rq worker.可以让那个工人检查 tasks吗 在3台服务器中的每台上都排队?

我尝试创建连接列表

import redis
from rq import Queue, Worker
from rq import push_connection
# urls = [url1, url2, url3]
connections = list(map(redis.from_url, urls))

然后我用它来创建队列列表。

queues = list(map(lambda c: Queue('tasks', connection=c), connections))

然后我推动所有连接

for connection in connections:
    push_connection(connection)

并将队列传递给 Worker

Worker(queues=queues).work()

这导致工作人员仅在 tasks上收听 不管最后按下什么连接。

我一直在研究rq上的代码,我想我可以写一个自定义工作程序 可以做到这一点的类,但是在我这样做之前,我想问一下是否还有另一种方法.也许甚至还有另一个排队框架?

最新回答
  • 27天前
    1 #

    好,我解决了这个问题.我仍然不确定我是否有权在此处发布实际的源代码,因此我将概述我的解决方案。

    我不得不超越 register_birth(self)register_death(self)dequeue_job_and_maintain_ttl(self, timeout) .这些功能的原始实现可以在此处找到。

    register_birth

    基本上,您必须遍历所有连接, push_connection(connection) ,完成注册过程,然后 pop_connection()

    小心只在 mapping中列出与该连接相对应的队列 变量.原始实现使用 queue_names(self) 获取队列名称列表.您必须做同样的事情 queue_names(self) 但仅适用于相关队列。

    register_death

    register_birth基本相同 .遍历所有连接, push_connection(connection) ,完成与原始实施相同的步骤,然后 pop_connection()

    dequeue_job_and_maintain_ttl

    让我们看一下该功能的原始实现.我们要保持一切不变,直到到达 try 块.在这里,我们想无限循环地遍历所有连接.您可以使用itertools.cycle来做到这一点。

    内部循环 push_connection(connection) ,并设置 self.connection 到当前连接.如果 self.connection = connection 丢失,可能无法正确返回作业结果。

    现在,我们将继续致电 self.queue_class.dequeue_any 与原始实施类似.但是我们将超时设置为 1 因此我们可以继续检查另一连接,如果当前连接没有该工人的任何工作。

    确保 self.queue_class.dequeue_any 使用与当前连接相对应的队列列表进行调用.在这种情况下, 仅包含相关队列。

    queues
    

    之后 result = self.queue_class.dequeue_any( queues, 1, connection=connection, job_class=self.job_class) ,并对 pop_connection()进行相同的检查 作为原始实现.如果 result 不是 result 我们找到工作要做,需要 跳出循环。

    保留原始实施中的所有其他内容.别忘了 Nonebreak的尽头 块.它突兀了 循环。

    另一件事

    队列包含对其连接的引用.您可以使用它来创建 break的列表 哪里 try 包含所有具有连接 while True的队列

    如果将结果列表传递给itertools.cycle,则会获得无限的迭代器,该迭代器在覆盖 (connection, queues)

    queues

  • 基于接受标头的nginx错误页面不适用于json请求
  • neo4j:使用APOCexport时是否可能不输出关系列?