Background Current tornado-celery project seems doesn’t support celery4.x, no update for this project for long time Research pika AMQP library already have an adpator “TornadoConnection” which can be used with the Tornado IO Loop. Then it has possibility to integrate this into celery to support async connection. Before starting with this target, we need to get familar with how celery send task in client side. Assume we have a celery client use AMQP broker, and redis as result backend.

Read More…

Scenario While implementing wxgigo project, I need create an agent which will receive the POST requests from Wechat server, then relay to AMQP broker. In the appserver side, celery workers fetch messages from AMQP server and save result in result backend, finally agent will retrieve the result for result backend and return to Wechat server. Agent: +----------------+ | Wechat Server | +----------------+ | |http post +-----------------------------------------|---------------------------------+ | Wxgigo Agent Server +-------------------+ | | | nginx | | | +-------------------+ | | |forward http post | | | | | +--------------+ +--------------+ +--------------+ | | | wxgigo agent | | wxgigo agent | | wxgigo agent | | | +--------------+ +--------------+ +--------------+ | +------------------------------------------|--------------------------------+ |AMQP message | +-----------------+ | AMQP Broker | +-----------------+ Consideration By comparing multiple python web framework benchmark, I prefer to choose Tornado as the base framework for it’s async supportability, but to utilize this async feature for performance consideration, it need decouple all the steps to be async ready including connecting to broker, send message, receive message reply, retrieve result from backend…

Read More…

There is a new feature in Celery4.x which introduce a Promise like style async method which can retrieve result asynchronicly. But I nerver make it works with RabbitMQ backend or Redis backend, so I dive into source codes to find out why it doesn’t work. When we call AsyncResult.then(on_result_ready), it will do following in celery_root/result.py def then(self, callback, on_error=None, weak=False): self.backend.add_pending_result(self, weak=weak) return self.on_ready.then(callback, on_error) For self.backend.add_pending_result(self, weak=weak), it add myself(AsyncResult instance) into corresponding backend

Read More…