Router - Load Balancing
LiteLLM manages:
- Load-balance across multiple deployments (e.g. Azure/OpenAI)
- Prioritizing important requests to ensure they don't fail (i.e. Queueing)
- Basic reliability logic - cooldowns, fallbacks, timeouts and retries (fixed + exponential backoff) across multiple deployments/providers.
In production, litellm supports using Redis as a way to track cooldown server and usage (managing tpm/rpm limits).
If you want a server to load balance across different LLM APIs, use ourLiteLLM Proxy Server
Load Balancing
(s/o@paulpierre andsweep proxy for their contributions to this implementation)See Code
Quick Start
Loadbalance across multipleazure/bedrock/provider deployments. LiteLLM will handle retrying in different regions if a call fails.
- SDK
- PROXY
from litellmimport Router
model_list=[{# list of model deployments
"model_name":"gpt-3.5-turbo",# model alias -> loadbalance between models with same `model_name`
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",# actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},{
"model_name":"gpt-4",
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/gpt-4",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
}
},{
"model_name":"gpt-4",
"litellm_params":{# params for litellm completion/embedding call
"model":"gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},
]
router= Router(model_list=model_list)
# openai.ChatCompletion.create replacement
# requests with model="gpt-3.5-turbo" will pick a deployment where model_name="gpt-3.5-turbo"
response=await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}])
print(response)
# openai.ChatCompletion.create replacement
# requests with model="gpt-4" will pick a deployment where model_name="gpt-4"
response=await router.acompletion(model="gpt-4",
messages=[{"role":"user","content":"Hey, how's it going?"}])
print(response)
See detailed proxy loadbalancing/fallback docshere
- Setup model_list with multiple deployments
model_list:
-model_name: gpt-3.5-turbo
litellm_params:
model: azure/<your-deployment-name>
api_base: <your-azure-endpoint>
api_key: <your-azure-api-key>
-model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-small-ca
api_base: https://my-endpoint-canada-berri992.openai.azure.com/
api_key: <your-azure-api-key>
-model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-large
api_base: https://openai-france-1234.openai.azure.com/
api_key: <your-azure-api-key>
- Start proxy
litellm --config /path/to/config.yaml
- Test it!
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hi there!"}
],
"mock_testing_rate_limit_error": true
}'
Available Endpoints
router.completion()- chat completions endpoint to call 100+ LLMsrouter.acompletion()- async chat completion callsrouter.embedding()- embedding endpoint for Azure, OpenAI, Huggingface endpointsrouter.aembedding()- async embeddings callsrouter.text_completion()- completion calls in the old OpenAI/v1/completionsendpoint formatrouter.atext_completion()- async text completion callsrouter.image_generation()- completion calls in OpenAI/v1/images/generationsendpoint formatrouter.aimage_generation()- async image generation calls
Advanced - Routing Strategies ⭐️
Routing Strategies - Weighted Pick, Rate Limit Aware, Least Busy, Latency Based, Cost Based
Router provides multiple strategies for routing your calls across multiple deployments.We recommend usingsimple-shuffle (default) for best performance in production.
- (Default) Weighted Pick - RECOMMENDED
- Rate-Limit Aware v2 (ASYNC)
- Latency-Based
- Rate-Limit Aware
- Least-Busy
- Custom Routing Strategy
- Lowest Cost Routing (Async)
Default and Recommended for Production - Best performance with minimal latency overhead.
Picks a deployment based on the providedRequests per minute (rpm) or Tokens per minute (tpm)
Ifrpm ortpm is not provided, it randomly picks a deployment
You can also set aweight param, to specify which model should get picked when.
- RPM-based shuffling
- Weight-based shuffling
LiteLLM Proxy Config.yaml
model_list:
-model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm:900
-model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm:10
Python SDK
from litellmimport Router
import asyncio
model_list=[{# list of model deployments
"model_name":"gpt-3.5-turbo",# model alias
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",# actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm":900,# requests per minute for this API
}
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm":10,
}
},]
# init router
router= Router(model_list=model_list, routing_strategy="simple-shuffle")
asyncdefrouter_acompletion():
response=await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
LiteLLM Proxy Config.yaml
model_list:
-model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight:9
-model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight:1
Python SDK
from litellmimport Router
import asyncio
model_list=[{
"model_name":"gpt-3.5-turbo",# model alias
"litellm_params":{
"model":"azure/chatgpt-v-2",# actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight":9,# pick this 90% of the time
}
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{
"model":"azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight":1,
}
}]
# init router
router= Router(model_list=model_list, routing_strategy="simple-shuffle")
asyncdefrouter_acompletion():
response=await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
[!WARNING]
Usage-based routing is not recommended for production due to performance impacts. Usesimple-shuffle(default) for optimal performance in high-traffic scenarios. Usage-based routing adds significant latency due to Redis operations for tracking usage across deployments.
🎉 NEW This is an async implementation of usage-based-routing.
Filters out deployment if tpm/rpm limit exceeded - If you pass in the deployment's tpm/rpm limits.
Routes todeployment with lowest TPM usage for that minute.
In production, we use Redis to track usage (TPM/RPM) across multiple deployments. This implementation usesasync redis calls (redis.incr and redis.mget).
For Azure,you get 6 RPM per 1000 TPM
- sdk
- proxy
from litellmimport Router
model_list=[{# list of model deployments
"model_name":"gpt-3.5-turbo",# model alias
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",# actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm":100000,
"rpm":10000,
},
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm":100000,
"rpm":1000,
},
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"tpm":100000,
"rpm":1000,
},
}]
router= Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="simple-shuffle"# 👈 RECOMMENDED - best performance
enable_pre_call_checks=True,# enables router rate limits for concurrent calls
)
response=await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}]
print(response)
1. Set strategy in config
model_list:
-model_name: gpt-3.5-turbo# model alias
litellm_params:# params for litellm completion/embedding call
model: azure/chatgpt-v-2# actual model name
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
tpm:100000
rpm:10000
-model_name: gpt-3.5-turbo
litellm_params:# params for litellm completion/embedding call
model: gpt-3.5-turbo
api_key: os.getenv(OPENAI_API_KEY)
tpm:100000
rpm:1000
router_settings:
routing_strategy: simple-shuffle# 👈 RECOMMENDED - best performance
redis_host: <your-redis-host>
redis_password: <your-redis-password>
redis_port: <your-redis-port>
enable_pre_call_check:true
general_settings:
master_key: sk-1234
2. Start proxy
litellm --config /path/to/config.yaml
3. Test it!
curl --location 'http://localhost:4000/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-1234' \
--data '{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hey, how's it going?"}]
}'
Picks the deployment with the lowest response time.
It caches, and updates the response times for deployments based on when a request was sent and received from a deployment.
from litellmimport Router
import asyncio
model_list=[{...}]
# init router
router= Router(model_list=model_list,
routing_strategy="latency-based-routing",# 👈 set routing strategy
enable_pre_call_check=True,# enables router rate limits for concurrent calls
)
## CALL 1+2
tasks=[]
response=None
final_response=None
for _inrange(2):
tasks.append(router.acompletion(model=model, messages=messages))
response=await asyncio.gather(*tasks)
if responseisnotNone:
## CALL 3
await asyncio.sleep(1)# let the cache update happen
picked_deployment= router.lowestlatency_logger.get_available_deployments(
model_group=model, healthy_deployments=router.healthy_deployments
)
final_response=await router.acompletion(model=model, messages=messages)
print(f"min deployment id:{picked_deployment}")
print(f"model id:{final_response._hidden_params['model_id']}")
assert(
final_response._hidden_params["model_id"]
== picked_deployment["model_info"]["id"]
)
Set Time Window
Set time window for how far back to consider when averaging latency for a deployment.
In Router
router= Router(..., routing_strategy_args={"ttl":10})
In Proxy
router_settings:
routing_strategy_args:{"ttl":10}
Set Lowest Latency Buffer
Set a buffer within which deployments are candidates for making calls to.
E.g.
if you have 5 deployments
https://litellm-prod-1.openai.azure.com/: 0.07s
https://litellm-prod-2.openai.azure.com/: 0.1s
https://litellm-prod-3.openai.azure.com/: 0.1s
https://litellm-prod-4.openai.azure.com/: 0.1s
https://litellm-prod-5.openai.azure.com/: 4.66s
to prevent initially overloadingprod-1, with all requests - we can set a buffer of 50%, to consider deploymentsprod-2, prod-3, prod-4.
In Router
router= Router(..., routing_strategy_args={"lowest_latency_buffer":0.5})
In Proxy
router_settings:
routing_strategy_args:{"lowest_latency_buffer":0.5}
This will route to the deployment with the lowest TPM usage for that minute.
In production, we use Redis to track usage (TPM/RPM) across multiple deployments.
If you pass in the deployment's tpm/rpm limits, this will also check against that, and filter out any who's limits would be exceeded.
For Azure, your RPM = TPM/6.
from litellmimport Router
model_list=[{# list of model deployments
"model_name":"gpt-3.5-turbo",# model alias
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",# actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm":100000,
"rpm":10000,
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm":100000,
"rpm":1000,
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm":100000,
"rpm":1000,
}]
router= Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing"
enable_pre_call_check=True,# enables router rate limits for concurrent calls
)
response=await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}]
print(response)
Picks a deployment with the least number of ongoing calls, it's handling.
from litellmimport Router
import asyncio
model_list=[{# list of model deployments
"model_name":"gpt-3.5-turbo",# model alias
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",# actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
},{
"model_name":"gpt-3.5-turbo",
"litellm_params":{# params for litellm completion/embedding call
"model":"gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}]
# init router
router= Router(model_list=model_list, routing_strategy="least-busy")
asyncdefrouter_acompletion():
response=await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
Plugin a custom routing strategy to select deployments
Step 1. Define your custom routing strategy
from litellm.routerimport CustomRoutingStrategyBase
classCustomRoutingStrategy(CustomRoutingStrategyBase):
asyncdefasync_get_available_deployment(
self,
model:str,
messages: Optional[List[Dict[str,str]]]=None,
input: Optional[Union[str, List]]=None,
specific_deployment: Optional[bool]=False,
request_kwargs: Optional[Dict]=None,
):
"""
Asynchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
print("In CUSTOM async get available deployment")
model_list= router.model_list
print("router model list=", model_list)
for modelin model_list:
ifisinstance(model,dict):
if model["litellm_params"]["model"]=="openai/very-special-endpoint":
return model
pass
defget_available_deployment(
self,
model:str,
messages: Optional[List[Dict[str,str]]]=None,
input: Optional[Union[str, List]]=None,
specific_deployment: Optional[bool]=False,
request_kwargs: Optional[Dict]=None,
):
"""
Synchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
pass
Step 2. Initialize Router with custom routing strategy
from litellmimport Router
router= Router(
model_list=[
{
"model_name":"azure-model",
"litellm_params":{
"model":"openai/very-special-endpoint",
"api_base":"https://exampleopenaiendpoint-production.up.railway.app/",# If you are Krrish, this is OpenAI Endpoint3 on our Railway endpoint :)
"api_key":"fake-key",
},
"model_info":{"id":"very-special-endpoint"},
},
{
"model_name":"azure-model",
"litellm_params":{
"model":"openai/fast-endpoint",
"api_base":"https://exampleopenaiendpoint-production.up.railway.app/",
"api_key":"fake-key",
},
"model_info":{"id":"fast-endpoint"},
},
],
set_verbose=True,
debug_level="DEBUG",
timeout=1,
)# type: ignore
router.set_custom_routing_strategy(CustomRoutingStrategy())# 👈 Set your routing strategy here
Step 3. Test your routing strategy. Expect your custom routing strategy to be called when runningrouter.acompletion requests
for _inrange(10):
response=await router.acompletion(
model="azure-model", messages=[{"role":"user","content":"hello"}]
)
print(response)
_picked_model_id= response._hidden_params["model_id"]
print("picked model=", _picked_model_id)
Picks a deployment based on the lowest cost
How this works:
- Get all healthy deployments
- Select all deployments that are under their provided
rpm/tpmlimits - For each deployment check if
litellm_param["model"]exists inlitellm_model_cost_map- if deployment does not exist in
litellm_model_cost_map-> use deployment_cost=$1
- if deployment does not exist in
- Select deployment with lowest cost
from litellmimport Router
import asyncio
model_list=[
{
"model_name":"gpt-3.5-turbo",
"litellm_params":{"model":"gpt-4"},
"model_info":{"id":"openai-gpt-4"},
},
{
"model_name":"gpt-3.5-turbo",
"litellm_params":{"model":"groq/llama3-8b-8192"},
"model_info":{"id":"groq-llama"},
},
]
# init router
router= Router(model_list=model_list, routing_strategy="cost-based-routing")
asyncdefrouter_acompletion():
response=await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"])# expect groq-llama, since groq/llama has lowest cost
return response
asyncio.run(router_acompletion())
Using Custom Input/Output pricing
Setlitellm_params["input_cost_per_token"] andlitellm_params["output_cost_per_token"] for using custom pricing when routing
model_list=[
{
"model_name":"gpt-3.5-turbo",
"litellm_params":{
"model":"azure/chatgpt-v-2",
"input_cost_per_token":0.00003,
"output_cost_per_token":0.00003,
},
"model_info":{"id":"chatgpt-v-experimental"},
},
{
"model_name":"gpt-3.5-turbo",
"litellm_params":{
"model":"azure/chatgpt-v-1",
"input_cost_per_token":0.000000001,
"output_cost_per_token":0.00000001,
},
"model_info":{"id":"chatgpt-v-1"},
},
{
"model_name":"gpt-3.5-turbo",
"litellm_params":{
"model":"azure/chatgpt-v-5",
"input_cost_per_token":10,
"output_cost_per_token":12,
},
"model_info":{"id":"chatgpt-v-5"},
},
]
# init router
router= Router(model_list=model_list, routing_strategy="cost-based-routing")
asyncdefrouter_acompletion():
response=await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"])# expect chatgpt-v-1, since chatgpt-v-1 has lowest cost
return response
asyncio.run(router_acompletion())
Traffic Mirroring / Silent Experiments
Traffic mirroring allows you to "mimic" production traffic to a secondary (silent) model for evaluation purposes. The silent model's response is gathered in the background and does not affect the latency or result of the primary request.
See detailed guide on A/B Testing - Traffic Mirroring here
Basic Reliability
Deployment Ordering (Priority)
Setorder inlitellm_params to prioritize deployments. Lower values = higher priority. When multiple deployments share the sameorder, the routing strategy picks among them.
- SDK
- PROXY
from litellmimport Router
model_list=[
{
"model_name":"gpt-4",
"litellm_params":{
"model":"azure/gpt-4-primary",
"api_key": os.getenv("AZURE_API_KEY"),
"order":1,# 👈 Highest priority
},
},
{
"model_name":"gpt-4",
"litellm_params":{
"model":"azure/gpt-4-fallback",
"api_key": os.getenv("AZURE_API_KEY_2"),
"order":2,# 👈 Used when order=1 is unavailable
},
},
]
router= Router(model_list=model_list, enable_pre_call_checks=True)# 👈 Required for 'order' to work
Theorder parameter requiresenable_pre_call_checks=True to be set on the Router.
model_list:
-model_name: gpt-4
litellm_params:
model: azure/gpt-4-primary
api_key: os.environ/AZURE_API_KEY
order:1# 👈 Highest priority
-model_name: gpt-4
litellm_params:
model: azure/gpt-4-fallback
api_key: os.environ/AZURE_API_KEY_2
order:2# 👈 Used when order=1 is unavailable
router_settings:
enable_pre_call_checks:true# 👈 Required for 'order' to work
Weighted Deployments
Setweight on a deployment to pick one deployment more often than others.
This works acrosssimple-shuffle routing strategy (this is the default, if no routing strategy is selected).
- SDK
- PROXY
from litellmimport Router
model_list=[
{
"model_name":"o1",
"litellm_params":{
"model":"o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight":1
},
},
{
"model_name":"o1",
"litellm_params":{
"model":"o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight":2# 👈 PICK THIS DEPLOYMENT 2x MORE OFTEN THAN o1-preview
},
},
]
router= Router(model_list=model_list, routing_strategy="cost-based-routing")
response=await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}]
)
print(response)
model_list:
-model_name: o1
litellm_params:
model: o1
api_key: os.environ/OPENAI_API_KEY
weight:1
-model_name: o1
litellm_params:
model: o1-preview
api_key: os.environ/OPENAI_API_KEY
weight:2# 👈 PICK THIS DEPLOYMENT 2x MORE OFTEN THAN o1-preview
Max Parallel Requests (ASYNC)
Used in semaphore for async requests on router. Limit the max concurrent calls made to a deployment. Useful in high-traffic scenarios.
If tpm/rpm is set, and no max parallel request limit given, we use the RPM or calculated RPM (tpm/1000/6) as the max parallel request limit.
from litellmimport Router
model_list=[{
"model_name":"gpt-4",
"litellm_params":{
"model":"azure/gpt-4",
...
"max_parallel_requests":10# 👈 SET PER DEPLOYMENT
}
}]
### OR ###
router= Router(model_list=model_list, default_max_parallel_requests=20)# 👈 SET DEFAULT MAX PARALLEL REQUESTS
# deployment max parallel requests > default max parallel requests
Cooldowns
Set the limit for how many calls a model is allowed to fail in a minute, before being cooled down for a minute.
- SDK
- PROXY
from litellmimport Router
model_list=[{...}]
router= Router(model_list=model_list,
allowed_fails=1,# cooldown model if it fails > 1 call in a minute.
cooldown_time=100# cooldown the deployment for 100 seconds if it num_fails > allowed_fails
)
user_message="Hello, whats the weather in San Francisco??"
messages=[{"content": user_message,"role":"user"}]
# normal call
response= router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response:{response}")
Set Global Value
router_settings:
allowed_fails:3# cooldown model if it fails > 1 call in a minute.
cooldown_time:30# (in seconds) how long to cooldown model if fails/min > allowed_fails
Defaults:
- allowed_fails: 3
- cooldown_time: 5s (
DEFAULT_COOLDOWN_TIME_SECONDSin constants.py)
Set Per Model
model_list:
-model_name: fake-openai-endpoint
litellm_params:
model: predibase/llama-3-8b-instruct
api_key: os.environ/PREDIBASE_API_KEY
tenant_id: os.environ/PREDIBASE_TENANT_ID
max_new_tokens:256
cooldown_time:0# 👈 KEY CHANGE
Expected Response
No deployments available for selected model, Try again in 60 seconds. Passed model=claude-3-5-sonnet. pre-call-checks=False, allowed_model_region=n/a.
Disable cooldowns
- SDK
- PROXY
from litellmimport Router
router= Router(..., disable_cooldowns=True)
router_settings:
disable_cooldowns:True
How Cooldowns Work
Cooldowns apply to individual deployments, not entire model groups. The router isolates failures to specific deployments while keeping healthy alternatives available.
What is a deployment?
A deployment is a single entry in yourconfig.yaml model list. Each deployment represents a unique configuration with its ownlitellm_params.
LiteLLM generates a uniquemodel_id for each deployment by creating a deterministic hash of all thelitellm_params. This allows the router to track and manage each deployment independently.
Example: Multiple deployments for the same model
model_list:
-model_name: sonnet-4# Deployment 1
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <our-real-key>
-model_name: byok-sonnet-4# Deployment 2
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <customer-managed-key>
api_base: https://proxy.litellm.ai/api.anthropic.com
-model_name: sonnet-4# Deployment 3
litellm_params:
model: vertex_ai/claude-sonnet-4-20250514
vertex_project: my-project
Each deployment gets a uniquemodel_id (e.g.,1234567890,9129922,4982929292) that the router uses for tracking health and cooldown status.
When are deployments cooled down?
The router automatically cools down deployments based on the following conditions:
| Condition | Trigger | Cooldown Duration |
|---|---|---|
| Rate Limiting (429) | Immediate on 429 response | 5 seconds (default) |
| High Failure Rate | >50% failures in current minute | 5 seconds (default) |
| Non-Retryable Errors | 401 (Auth), 404 (Not Found), 408 (Timeout) | 5 seconds (default) |
During cooldown, the specific deployment is temporarily removed from the available pool, while other healthy deployments continue serving requests.
Cooldown Recovery
Deployments automatically recover from cooldown after the cooldown period expires. The router will:
- Monitor cooldown timers for each deployment
- Automatically re-enable deployments when cooldown expires
- Gradually reintroduce cooled-down deployments to the rotation
- Reset failure counters once the deployment is healthy again
Real-World Example
Consider this high-availability setup with multiple providers:
model_list:
-model_name: sonnet-4# Primary: Anthropic Direct
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <anthropic-key>
-model_name: byok-sonnet-4# BYOK: Customer-managed keys
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <customer-managed-key>
api_base: https://proxy.litellm.ai/api.anthropic.com
-model_name: sonnet-4# Fallback: Vertex AI
litellm_params:
model: vertex_ai/claude-sonnet-4-20250514
vertex_project: my-project
Failure Scenario:
Retries
For both async + sync functions, we support retrying failed requests.
For RateLimitError we implement exponential backoffs
For generic errors, we retry immediately
Here's a quick look at how we can setnum_retries = 3:
from litellmimport Router
model_list=[{...}]
router= Router(model_list=model_list,
num_retries=3)
user_message="Hello, whats the weather in San Francisco??"
messages=[{"content": user_message,"role":"user"}]
# normal call
response= router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response:{response}")
We also support setting minimum time to wait before retrying a failed request. This is via theretry_after param.
from litellmimport Router
model_list=[{...}]
router= Router(model_list=model_list,
num_retries=3, retry_after=5)# waits min 5s before retrying request
user_message="Hello, whats the weather in San Francisco??"
messages=[{"content": user_message,"role":"user"}]
# normal call
response= router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response:{response}")
[Advanced]: Custom Retries, Cooldowns based on Error Type
- Use
RetryPolicyif you want to set anum_retriesbased on the Exception received - Use
AllowedFailsPolicyto set a custom number ofallowed_fails/minute before cooling down a deployment
- SDK
- PROXY
Example:
retry_policy= RetryPolicy(
ContentPolicyViolationErrorRetries=3,# run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0,# run 0 retries for AuthenticationErrorRetries
)
allowed_fails_policy= AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000,# Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100,# Allow 100 RateLimitErrors before cooling down a deployment
)
Example Usage
from litellm.routerimport RetryPolicy, AllowedFailsPolicy
retry_policy= RetryPolicy(
ContentPolicyViolationErrorRetries=3,# run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0,# run 0 retries for AuthenticationErrorRetries
BadRequestErrorRetries=1,
TimeoutErrorRetries=2,
RateLimitErrorRetries=3,
)
allowed_fails_policy= AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000,# Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100,# Allow 100 RateLimitErrors before cooling down a deployment
)
router= litellm.Router(
model_list=[
{
"model_name":"gpt-3.5-turbo",# openai model name
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
{
"model_name":"bad-model",# openai model name
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",
"api_key":"bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
],
retry_policy=retry_policy,
allowed_fails_policy=allowed_fails_policy,
)
response=await router.acompletion(
model=model,
messages=messages,
)
router_settings:
retry_policy:{
"BadRequestErrorRetries":3,
"ContentPolicyViolationErrorRetries":4
}
allowed_fails_policy:{
"ContentPolicyViolationErrorAllowedFails":1000,# Allow 1000 ContentPolicyViolationError before cooling down a deployment
"RateLimitErrorAllowedFails":100# Allow 100 RateLimitErrors before cooling down a deployment
}
Caching
In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching.
In-memory Cache
router= Router(model_list=model_list,
cache_responses=True)
print(response)
Redis Cache
router= Router(model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)
print(response)
Pass in Redis URL, additional kwargs
router= Router(model_list: Optional[list]=None,
## CACHING ##
redis_url=os.getenv("REDIS_URL")",
cache_kwargs={},# additional kwargs to pass to RedisCache (see caching.py)
cache_responses=True)
When configuring Redis caching in router settings, usecache_kwargs to pass additional Redis parameters, especially for non-string values that may fail when set viaREDIS_* environment variables.
Pre-Call Checks (Context Window, EU-Regions)
Enable pre-call checks to filter out:
- deployments with context window limit < messages for a call.
- deployments outside of eu-region
- SDK
- Proxy
1. Enable pre-call checks
from litellmimport Router
# ...
router= Router(model_list=model_list, enable_pre_call_checks=True)# 👈 Set to True
2. Set Model List
For context window checks on azure deployments, set the base model. Pick the base model fromthis list, all the azure models start withazure/.
For 'eu-region' filtering, Set 'region_name' of deployment.
Note: We automatically infer region_name for Vertex AI, Bedrock, and IBM WatsonxAI based on your litellm params. For Azure, setlitellm.enable_preview = True.
model_list=[
{
"model_name":"gpt-3.5-turbo",# model group name
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name":"eu"# 👈 SET 'EU' REGION NAME
"base_model":"azure/gpt-35-turbo",# 👈 (Azure-only) SET BASE MODEL
},
},
{
"model_name":"gpt-3.5-turbo",# model group name
"litellm_params":{# params for litellm completion/embedding call
"model":"gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name":"gemini-pro",
"litellm_params:{
"model":"vertex_ai/gemini-pro-1.5",
"vertex_project":"adroit-crow-1234",
"vertex_location":"us-east1"# 👈 AUTOMATICALLY INFERS 'region_name'
}
}
]
router= Router(model_list=model_list, enable_pre_call_checks=True)
3. Test it!
- Context Window Check
- EU Region Check
"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
from litellmimport Router
import os
model_list=[
{
"model_name":"gpt-3.5-turbo",# model group name
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model":"azure/gpt-35-turbo",
},
"model_info":{
"base_model":"azure/gpt-35-turbo",
}
},
{
"model_name":"gpt-3.5-turbo",# model group name
"litellm_params":{# params for litellm completion/embedding call
"model":"gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router= Router(model_list=model_list, enable_pre_call_checks=True)
text="What is the meaning of 42?"*5000
response= router.completion(
model="gpt-3.5-turbo",
messages=[
{"role":"system","content": text},
{"role":"user","content":"Who was Alexander?"},
],
)
print(f"response:{response}")
"""
- Give 2 gpt-3.5-turbo deployments, in eu + non-eu regions
- Make a call
- Assert it picks the eu-region model
"""
from litellmimport Router
import os
model_list=[
{
"model_name":"gpt-3.5-turbo",# model group name
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name":"eu"
},
"model_info":{
"id":"1"
}
},
{
"model_name":"gpt-3.5-turbo",# model group name
"litellm_params":{# params for litellm completion/embedding call
"model":"gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"model_info":{
"id":"2"
}
},
]
router= Router(model_list=model_list, enable_pre_call_checks=True)
response= router.completion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Who was Alexander?"}],
)
print(f"response:{response}")
print(f"response id:{response._hidden_params['model_id']}")
Gohere for how to do this on the proxy
Caching across model groups
If you want to cache across 2 different model groups (e.g. azure deployments, and openai), use caching groups.
import litellm, asyncio, time
from litellmimport Router
# set os env
os.environ["OPENAI_API_KEY"]=""
os.environ["AZURE_API_KEY"]=""
os.environ["AZURE_API_BASE"]=""
os.environ["AZURE_API_VERSION"]=""
asyncdeftest_acompletion_caching_on_router_caching_groups():
# tests acompletion + caching on router
try:
litellm.set_verbose=True
model_list=[
{
"model_name":"openai-gpt-3.5-turbo",
"litellm_params":{
"model":"gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name":"azure-gpt-3.5-turbo",
"litellm_params":{
"model":"azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]
messages=[
{"role":"user","content":f"write a one sentence poem{time.time()}?"}
]
start_time= time.time()
router= Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo","azure-gpt-3.5-turbo")])
response1=await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1:{response1}")
await asyncio.sleep(1)# add cache is async, async sleep for cache to get set
response2=await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id== response2.id
assertlen(response1.choices[0].message.content)>0
assert response1.choices[0].message.content== response2.choices[0].message.content
except Exceptionas e:
traceback.print_exc()
asyncio.run(test_acompletion_caching_on_router_caching_groups())
Alerting 🚨
Send alerts to slack / your webhook url for the following events
- LLM API Exceptions
- Slow LLM Responses
Get a slack webhook url fromhttps://api.slack.com/messaging/webhooks
Usage
Initialize anAlertingConfig and pass it tolitellm.Router. The following code will trigger an alert becauseapi_key=bad-key which is invalid
import litellm
from litellm.routerimport Router
from litellm.types.routerimport AlertingConfig
import os
import asyncio
router= Router(
model_list=[
{
"model_name":"gpt-3.5-turbo",
"litellm_params":{
"model":"gpt-3.5-turbo",
"api_key":"bad_key",
},
}
],
alerting_config= AlertingConfig(
alerting_threshold=10,
webhook_url="https:/..."
),
)
asyncdefmain():
print(f"\n=== Configuration ===")
print(f"Slack logger exists:{router.slack_alerting_loggerisnotNone}")
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hey, how's it going?"}],
)
except Exceptionas e:
print(f"\n=== Exception caught ===")
print(f"Waiting 10 seconds for alerts to be sent via periodic flush...")
await asyncio.sleep(10)
print(f"\n=== After waiting ===")
print(f"Alert should have been sent to Slack!")
asyncio.run(main())
Track cost for Azure Deployments
Problem: Azure returnsgpt-4 in the response whenazure/gpt-4-1106-preview is used. This leads to inaccurate cost tracking
Solution ✅ : Setmodel_info["base_model"] on your router init so litellm uses the correct model for calculating azure cost
Step 1. Router Setup
from litellmimport Router
model_list=[
{# list of model deployments
"model_name":"gpt-4-preview",# model alias
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-v-2",# actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info":{
"base_model":"azure/gpt-4-1106-preview"# azure/gpt-4-1106-preview will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
},
{
"model_name":"gpt-4-32k",
"litellm_params":{# params for litellm completion/embedding call
"model":"azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info":{
"base_model":"azure/gpt-4-32k"# azure/gpt-4-32k will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
}
]
router= Router(model_list=model_list)
Step 2. Accessresponse_cost in the custom callback,litellm calculates the response cost for you
import litellm
from litellm.integrations.custom_loggerimport CustomLogger
classMyCustomHandler(CustomLogger):
deflog_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
response_cost= kwargs.get("response_cost")
print("response_cost=", response_cost)
customHandler= MyCustomHandler()
litellm.callbacks=[customHandler]
# router completion call
response= router.completion(
model="gpt-4-32k",
messages=[{"role":"user","content":"Hi who are you"}]
)
Default litellm.completion/embedding params
You can also set default params for litellm completion/embedding calls. Here's how to do that:
from litellmimport Router
fallback_dict={"gpt-3.5-turbo":"gpt-3.5-turbo-16k"}
router= Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})
user_message="Hello, whats the weather in San Francisco??"
messages=[{"content": user_message,"role":"user"}]
# normal call
response= router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response:{response}")
Custom Callbacks - Track API Key, API Endpoint, Model Used
If you need to track the api_key, api endpoint, model, custom_llm_provider used for each completion call, you can setup acustom callback
Usage
import litellm
from litellm.integrations.custom_loggerimport CustomLogger
classMyCustomHandler(CustomLogger):
deflog_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print("kwargs=", kwargs)
litellm_params= kwargs.get("litellm_params")
api_key= litellm_params.get("api_key")
api_base= litellm_params.get("api_base")
custom_llm_provider= litellm_params.get("custom_llm_provider")
response_cost= kwargs.get("response_cost")
# print the values
print("api_key=", api_key)
print("api_base=", api_base)
print("custom_llm_provider=", custom_llm_provider)
print("response_cost=", response_cost)
deflog_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
print("kwargs=")
customHandler= MyCustomHandler()
litellm.callbacks=[customHandler]
# Init Router
router= Router(model_list=model_list, routing_strategy="simple-shuffle")
# router completion call
response= router.completion(
model="gpt-3.5-turbo",
messages=[{"role":"user","content":"Hi who are you"}]
)
Deploy Router
If you want a server to load balance across different LLM APIs, use ourLiteLLM Proxy Server
Debugging Router
Basic Debugging
SetRouter(set_verbose=True)
from litellmimport Router
router= Router(
model_list=model_list,
set_verbose=True
)
Detailed Debugging
SetRouter(set_verbose=True,debug_level="DEBUG")
from litellmimport Router
router= Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG"# defaults to INFO
)
Very Detailed Debugging
Setlitellm.set_verbose=True andRouter(set_verbose=True,debug_level="DEBUG")
from litellmimport Router
import litellm
litellm.set_verbose=True
router= Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG"# defaults to INFO
)
Router General Settings
Usage
router= Router(model_list=..., router_general_settings=RouterGeneralSettings(async_only_mode=True))
Spec
classRouterGeneralSettings(BaseModel):
async_only_mode:bool= Field(
default=False
)# this will only initialize async clients. Good for memory utils
pass_through_all_models:bool= Field(
default=False
)# if passed a model not llm_router model list, pass through the request to litellm.acompletion/embedding
- Load Balancing
- Advanced - Routing Strategies ⭐️
- Traffic Mirroring / Silent Experiments
- Basic Reliability
- Pre-Call Checks (Context Window, EU-Regions)
- Caching across model groups
- Alerting 🚨
- Track cost for Azure Deployments
- Custom Callbacks - Track API Key, API Endpoint, Model Used
- Deploy Router
- Debugging Router
- Router General Settings