flask - How to check if Celery/Supervisor is running using Python -


how write script in python outputs if celery running on machine (ubuntu)?

my use-case. have simple python file tasks. i'm not using django or flask. use supervisor run task queue. example,

tasks.py

from celery import celery, task app = celery('tasks') @app.task() def add_together(a, b):     return + b 

supervisor:

[program:celery_worker] directory = /var/app/ command=celery -a tasks worker info 

this works, want have page checks if celery/supervisor process running. i.e. maybe using flask allowing me host page giving 200 status allowing me load balance.

for example...

check_status.py

from flask import flask  app = flask(__name__)  @app.route('/') def status_check():      #check supervisor running     if supervisor:          return render_template('up.html')     else:         return render_template('down.html')  if __name__ == '__main__':     app.run() 

you can run celery status command via code importing celery.bin.celery package:

import celery import celery.bin.base import celery.bin.celery import celery.platforms  app = celery.celery('tasks', broker='redis://')  status = celery.bin.celery.celerycommand.commands['status']() status.app = status.get_app()  def celery_is_up():     try:         status.run()         return true     except celery.bin.base.error e:         if e.status == celery.platforms.ex_unavailable:             return false         raise e  if __name__ == '__main__':     if celery_is_up():         print('celery up!')     else:         print('celery not responding...') 

Comments

Popular posts from this blog

java - Static nested class instance -

c# - Bluetooth LE CanUpdate Characteristic property -

JavaScript - Replace variable from string in all occurrences -