import time
import typing
import fastapi.responses
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.job import Job
import pytz
import uvicorn
app: FastAPI = FastAPI()
scheduler: typing.Optional[AsyncIOScheduler] = None
@app.on_event("startup")async def on_startup():
global scheduler
job_stores: typing.Dict[str, typing.Any] = { "default": MemoryJobStore()
}
executors: typing.Dict[str, typing.Any] = { "default": { "type": "asyncio",
}
}
job_defaults: typing.Dict[str, typing.Any] = { "coalesce": False,
"max_instances": 1
}
scheduler = AsyncIOScheduler(
jobstores=job_stores,
executors=executors,
job_defaults=job_defaults,
timezone=pytz.timezone("Asia/Shanghai") )
scheduler.start()
@app.on_event("shutdown")def on_shutdown():
global scheduler
if scheduler is not None:
scheduler.shutdown()
scheduler = None
async def test_infinite_background_job():
counter: int = 1
while True:
print(f"counter: {counter}") counter += 1
time.sleep(100)
@app.get("/job")async def add_job():
global scheduler
job: Job = scheduler.add_job(
func=test_infinite_background_job
)
return fastapi.responses.JSONResponse(content={"job_id": job.id}, status_code=fastapi.status.HTTP_201_CREATED)
@app.get("/ping")async def ping():
return fastapi.responses.JSONResponse(content="pong", status_code=fastapi.status.HTTP_200_OK)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=9999)