Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A modern python scheduling framework with dependency injection and modular integration support. Alternative for Rocketry or apscheduler

License

NotificationsYou must be signed in to change notification settings

ManiMozaffar/aioclock

Repository files navigation

ReleaseBuild statusCommit activityLicense

An asyncio-based scheduling framework designed for execution of periodic task with integrated support for dependency injection, enabling efficient and flexiable task management

Features

Aioclock offers:

  • Async: 100% Async, very light, fast and resource friendly
  • Scheduling: Keep scheduling tasks for you
  • Group: Group your task, for better code maintainability
  • Trigger: Already defined and easily extendable triggers, to trigger your scheduler to be started
  • Easy syntax: Library's syntax is very easy and enjoyable, no confusing hierarchy
  • Pydantic v2 validation: Validate all your trigger on startup using pydantic 2. Fastest to fail possible!
  • Soon: Running the task dispatcher (scheduler) on different process by default, so CPU intensive stuff on task won't delay the scheduling
  • Soon: Backend support, to allow horizontal scalling, by synchronizing, maybe using Redis

Getting started

To Install aioclock, simply do

pip install aioclock

Help

Seedocumentation for more details.

A Sample Example

fromcollections.abcimportAsyncGeneratorfromcontextlibimportasynccontextmanagerimportasynciofromaioclockimportAioClock,At,Depends,Every,Forever,Oncefromaioclock.groupimportGroup# groups.pygroup=Group()defmore_useless_than_me():return"I'm a dependency. I'm more useless than a screen door on a submarine."@group.task(trigger=Every(seconds=10))asyncdefevery():print("Every 10 seconds, I make a quantum leap. Where will I land next?")@group.task(trigger=Every(seconds=5))defeven_sync_works():print("I'm a synchronous task. I work even in async world.")@group.task(trigger=At(tz="UTC",hour=0,minute=0,second=0))asyncdefat():print("When the clock strikes midnight... I turn into a pumpkin. Just kidding, I run this task!"    )@group.task(trigger=Forever())asyncdefforever(val:str=Depends(more_useless_than_me)):awaitasyncio.sleep(2)print("Heartbeat detected. Still not a zombie. Will check again in a bit.")assertval=="I'm a dependency. I'm more useless than a screen door on a submarine."@group.task(trigger=Once())asyncdefonce():print("Just once, I get to say something. Here it goes... I love lamp.")@asynccontextmanagerasyncdeflifespan(aio_clock:AioClock)->AsyncGenerator[AioClock]:# starting upprint("Welcome to the Async Chronicles! Did you know a group of unicorns is called a blessing? Well, now you do!"    )yieldaio_clock# shuting downprint("Going offline. Remember, if your code is running, you better go catch it!")# app.pyapp=AioClock(lifespan=lifespan)app.include_group(group)# main.pyif__name__=="__main__":asyncio.run(app.serve())

[8]ページ先頭

©2009-2025 Movatter.jp