flask - How to check if Celery/Supervisor is running using Python -
how write script in python outputs if celery running on machine (ubuntu)?
my use-case. have simple python file tasks. i'm not using django or flask. use supervisor run task queue. example,
tasks.py
from celery import celery, task app = celery('tasks') @app.task() def add_together(a, b): return + b
supervisor:
[program:celery_worker] directory = /var/app/ command=celery -a tasks worker info
this works, want have page checks if celery/supervisor process running. i.e. maybe using flask allowing me host page giving 200 status allowing me load balance.
for example...
check_status.py
from flask import flask app = flask(__name__) @app.route('/') def status_check(): #check supervisor running if supervisor: return render_template('up.html') else: return render_template('down.html') if __name__ == '__main__': app.run()
you can run celery status
command via code importing celery.bin.celery
package:
import celery import celery.bin.base import celery.bin.celery import celery.platforms app = celery.celery('tasks', broker='redis://') status = celery.bin.celery.celerycommand.commands['status']() status.app = status.get_app() def celery_is_up(): try: status.run() return true except celery.bin.base.error e: if e.status == celery.platforms.ex_unavailable: return false raise e if __name__ == '__main__': if celery_is_up(): print('celery up!') else: print('celery not responding...')
Comments
Post a Comment