I’m building a data ingestion pipeline in Python that collects data from a third-party REST API.The API allows a maximum of 100 requests per minute, and I need to fetch data for tens of thousands of items.
Here’s a simplified version of my current approach using asyncio and aiohttp:
import asyncioimport aiohttpasync def fetch(session, url): async with session.get(url) as resp: return await resp.json()async def main(urls): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] results = await asyncio.gather(*tasks) return resultsurls = [f"https://api.example.com/items/{i}" for i in range(10000)]data = asyncio.run(main(urls))This works for small sets of URLs but fails at scale — I quickly exceed the rate limit and start getting HTTP 429 errors.
I’ve tried introducing semaphores and sleep intervals:
semaphore = asyncio.Semaphore(10)async def fetch_limited(session, url): async with semaphore: async with session.get(url) as resp: if resp.status == 429: await asyncio.sleep(60) return await fetch_limited(session, url) return await resp.json()However:
It’s inefficient — sleeps block all tasks instead of just the rate-limited ones.
I still occasionally hit bursts of 429s, likely due to concurrency scheduling.
Retries are inconsistent and can cause starvation of certain tasks.
Question:What’s the most efficient and Pythonic way to:
Parallelize a large number of API calls asynchronously
Respect strict rate limits (e.g., 100 requests per minute)
Handle retries and exponential backoff cleanly
Avoid blocking the event loop when rate-limited
Would using libraries likeaiolimiter,tenacity, or anasyncio.Queue architecture be better suited?I’m looking for a robust design pattern or example that scales gracefully without hitting rate limits.
- 1Since the primary design goal is to respect the API's rate limit, the design parameters depend on therate limiting implementation. For example, if the server uses a calendar quota, the manner in which requests are made simply does not matter in this case. 100 requests in 1 minute is easily achievable on an average internet connection. You could make each request synchronously in turn and still have extra time to wait for the quota to reset.Xavier Pedraza– Xavier Pedraza2025-10-30 07:12:18 +00:00CommentedOct 30 at 7:12
- You'll also likely to find more success by respecting the
Retry-Afterheader if one is present. You don't want to waste resources making requests that you know will fail, whether or not such requests will impact the quota.Xavier Pedraza– Xavier Pedraza2025-10-30 07:17:48 +00:00CommentedOct 30 at 7:17 - There's almost no point in doing this asynchronously. If you know that you're limited to 100 calls per minute then do batches of up to 100 requests noting the start time before you start the batch. Then, before initiating subsequent batches, just check the time interval. You could also consider using
requestsorurllib3with a Retry strategy to make sure that you "back off" in case of any HTTP 429 responsesjackal– jackal2025-10-30 09:24:46 +00:00CommentedOct 30 at 9:24
1 Answer1
You’re running into 429 errors because semaphores only limitconcurrent requests, not therate of requests per minute.
To fix this, you need something that spreads out requests evenly over time and retries failed ones smartly.
The easiest way to do this in Python is to useaiolimiter for rate limiting andtenacity for retries with exponential backoff.
Here’s a clean example that works well even for tens of thousands of requests:
import asyncioimport aiohttpfrom aiolimiter import AsyncLimiterfrom tenacity import retry, stop_after_attempt, wait_exponential# Allow up to 100 requests per minuterate_limiter = AsyncLimiter(100, 60)@retry(wait=wait_exponential(multiplier=1, min=2, max=60), stop=stop_after_attempt(5))async def fetch(session, url): async with rate_limiter: # limits requests to 100/min async with session.get(url) as resp: if resp.status == 429: raise Exception("Rate limited") # will trigger retry resp.raise_for_status() return await resp.json()async def main(urls): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) return resultsurls = [f"https://api.example.com/items/{i}" for i in range(10000)]data = asyncio.run(main(urls))If you want to make it even more robust, you can process URLs using a queue and a few worker tasks instead of launching 10,000 coroutines at once
async def worker(session, queue): while True: url = await queue.get() if url is None: break try: await fetch(session, url) finally: queue.task_done()async def main(urls, n_workers=10): queue = asyncio.Queue() for url in urls: await queue.put(url) async with aiohttp.ClientSession() as session: workers = [asyncio.create_task(worker(session, queue)) for _ in range(n_workers)] await queue.join() for _ in range(n_workers): await queue.put(None) await asyncio.gather(*workers)This version uses 10 async workers that pull from a queue and still respect the same rate limit.
2 Comments
Explore related questions
See similar questions with these tags.