? Aiorq



Introduction

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web
interface.

See documentation for more details.

Requirements

  • redis >= 5.0
  • aioredis>=1.1.0 <2.0.0

Install

pip install aiorq
pip install aioredis

Quick Start

Task Definition

# tasks.py
# -*- coding: utf-8 -*-

import asyncio
import os

from aiorq.connections import RedisSettings
from aiorq.cron import cron


async def say_hello(ctx, name) -> None:
    await asyncio.sleep(5)
    print(f"Hello {name}")


async def say_hi(ctx, name) -> None:
    await asyncio.sleep(3)
    print(f"Hi {name}")


async def startup(ctx):
    print("starting... done")


async def shutdown(ctx):
    print("ending... done")


async def run_cron(ctx, time_='2021-11-16 10:26:05'):
    print(time_)


class WorkerSettings:
    redis_settings = RedisSettings(
        host=os.getenv("REDIS_HOST", "127.0.0.1"),
        port=os.getenv("REDIS_PORT", 6379),
        database=os.getenv("REDIS_DATABASE", 0),
        password=os.getenv("REDIS_PASSWORD", None)
    )

    functions = [say_hello, say_hi]

    on_startup = startup

    on_shutdown = shutdown

    cron_jobs = [
        cron(coroutine=run_cron, name="x100", minute=40, second=50, keep_result_forever=True)
    ]

    # allow_abort_jobs = True

    # worker_name = "ohuo"
    # queue_name = "ohuo"

Run aiorq worker

> aiorq tasks.WorkerSettings
15:08:50: Starting Queue: ohuo
15:08:50: Starting Worker: ohuo@04dce85c-1798-43eb-89d8-7c6d78919feb
15:08:50: Starting Functions: say_hello, EnHeng
15:08:50: redis_version=5.0.10 mem_usage=731.12K clients_connected=2 db_keys=9
starting...

Integration in FastAPI

# -*- coding: utf-8 -*-
import json
import os

from fastapi import FastAPI
from starlette.requests import Request

from aiorq.connections import RedisSettings, create_pool
from aiorq.jobs import Job

app = FastAPI()


@app.on_event("startup")
async def startup() -> None:
    app.state.redis = await create_pool(
        RedisSettings(
            host=os.getenv("REDIS_HOST", "127.0.0.1"),
            port=os.getenv("REDIS_PORT", 6379),
            database=os.getenv("REDIS_DATABASE", 0),
            password=os.getenv("REDIS_PASSWORD", None)
        )
    )


@app.get("/get_health_check")
async def get_health_check(request: Request, worker_name):
    result = await request.app.state.redis._get_health_check(worker_name=worker_name)
    return {"result": json.loads(result)}


@app.get("/enqueue_job_")
async def enqueue_job_(request: Request):
    job = await request.app.state.redis.enqueue_job('qy_spider_', _queue_name="comment_queue", _job_try=4)
    job_ = await job.info()
    return {"job_": job_}


@app.get("/index")
async def index(request: Request):
    functions = await request.app.state.redis.all_tasks()
    workers = await request.app.state.redis.all_workers()
    results = await request.app.state.redis.all_job_results()
    functions_num = len(json.loads(functions))
    workers_num = len(workers)
    results_num = len(results)
    results = {"functions_num": functions_num, "workers_num": workers_num, "results_num": results_num}
    return {"results": results}


@app.get("/get_all_workers")
async def get_all_workers(request: Request):
    results = await request.app.state.redis.all_workers()
    results = [json.loads(v) for v in results]
    return {"results": results}


@app.get("/get_all_functions")
async def get_all_functions(request: Request):
    results = await request.app.state.redis.all_tasks()
    return {"results": json.loads(results)}


@app.get("/get_all_result")
async def get_all_result(request: Request, worker=None, task=None, job_id=None):
    all_result_ = await request.app.state.redis.all_job_results()
    if worker:
        all_result_ = [result_ for result_ in all_result_ if result_.get("worker_name") == worker]
    if task:
        all_result_ = [result_ for result_ in all_result_ if result_.get("function") == task]
    if job_id:
        all_result_ = [result_ for result_ in all_result_ if result_.get("job_id") == job_id]

    return {"results_": all_result_}

@app.get("/queued_jobs")
async def queued_jobs(request: Request, queue_name="aiorq:queue"):
    queued_jobs_ = await request.app.state.redis.queued_jobs(queue_name=queue_name)
    queued_jobs__ = []
    for queued_job_ in queued_jobs_:
        state = await Job(job_id=queued_job_.__dict__.get("job_id"), redis=request.app.state.redis,
                          _queue_name=queue_name).status()
        queued_job_.__dict__.update({"state": state})
        queued_jobs__.append(queued_job_)
    return {"queued_jobs": queued_jobs__}


# job status
@app.get("/job_status")
async def job_status(request: Request, job_id="12673208ee3b417192b7cce06844adda", _queue_name="aiorq:queue"):
    job_status_ = await Job(job_id=job_id, redis=request.app.state.redis, _queue_name=_queue_name).info()
    return {"job_status_": job_status_}


if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app='main:app', host="0.0.0.0", port=9999, reload=True)

Thanks

License

MIT

GitHub

View Github