xxxxxxxxxx
pip install apscheduler fastapi pytz uvicorn
ximport asyncio
import time
import typing
import fastapi.responses
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.job import Job
import pytz
import uvicorn
app: FastAPI = FastAPI()
scheduler: typing.Optional[AsyncIOScheduler] = None
on_event("startup") .
async def on_startup():
global scheduler
job_stores: typing.Dict[str, typing.Any] = {
"default": MemoryJobStore()
}
executors: typing.Dict[str, typing.Any] = {
"default": {
"type": "asyncio",
}
}
job_defaults: typing.Dict[str, typing.Any] = {
"coalesce": False,
"max_instances": 1
}
scheduler = AsyncIOScheduler(
jobstores=job_stores,
executors=executors,
job_defaults=job_defaults,
timezone=pytz.timezone("Asia/Shanghai")
)
scheduler.start()
on_event("shutdown") .
def on_shutdown():
global scheduler
if scheduler is not None:
scheduler.shutdown()
scheduler = None
async def test_infinite_background_job():
counter: int = 1
while True:
print(f"counter: {counter}")
counter += 1
# 模拟后台阻塞场景。
# 应该使用 await asyncio.sleep(100)
time.sleep(100)
get("/job") .
async def add_job():
global scheduler
job: Job = scheduler.add_job(
func=test_infinite_background_job
)
return fastapi.responses.JSONResponse(content={"job_id": job.id}, status_code=fastapi.status.HTTP_201_CREATED)
get("/ping") .
async def ping():
return fastapi.responses.JSONResponse(content="pong", status_code=fastapi.status.HTTP_200_OK)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=9999)