Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Batch Processing

The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.

stateDiagram-v2    direction LR    BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>    LambdaInit: Lambda invocation    BatchProcessor: Batch Processor    RecordHandler: Record Handler function    YourLogic: Your logic to process each batch item    LambdaResponse: Lambda response    BatchSource --> LambdaInit    LambdaInit --> BatchProcessor    BatchProcessor --> RecordHandler    state BatchProcessor {        [*] --> RecordHandler: Your function        RecordHandler --> YourLogic    }    RecordHandler --> BatchProcessor: Collect results    BatchProcessor --> LambdaResponse: Report items that failed processing

Key features

  • Reports batch item failures to reduce number of retries for a record upon errors
  • Simple interface to process each batch record
  • Build your own batch processor by extending primitives

Background

When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.

If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first:a) your Lambda function returns a successful response,b) record reaches maximum retry attempts, orc) when records expire.

journey  section Conditions    Successful response: 5: Success    Maximum retries: 3: Failure    Records expired: 1: Failure

This behavior changes when you enable theReportBatchItemFailures feature in your Lambda function event source configuration:

  • SQS queues. Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
  • Kinesis data streams andDynamoDB streams. Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as the checkpoint.
Warning: This utility lowers the chance of processing records more than once; it does not guarantee it

We recommend implementing processing logic in anidempotent manner whenever possible.

You can find more details on how Lambda works with eitherSQS,Kinesis, orDynamoDB in the AWS Documentation.

Getting started

Installation

Install the library in your project

1
npmi@aws-lambda-powertools/batch

For this feature to work, you need to(1) configure your Lambda function event source to useReportBatchItemFailures, so that the response from the Batch Processing utility can inform the service which records failed to be processed.

Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.

Required resources

The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries.

You do not need any additional IAM permissions to use this utility, except for what each event source requires.

template.yaml
 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243
AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Description:partial batch response sampleGlobals:Function:Timeout:5MemorySize:256Runtime:nodejs22.xTracing:ActiveEnvironment:Variables:POWERTOOLS_LOG_LEVEL:INFOPOWERTOOLS_SERVICE_NAME:helloResources:HelloWorldFunction:Type:AWS::Serverless::FunctionProperties:Handler:index.handlerCodeUri:hello_worldPolicies:-SQSPollerPolicy:QueueName:!GetAttSampleQueue.QueueNameEvents:Batch:Type:SQSProperties:Queue:!GetAttSampleQueue.ArnFunctionResponseTypes:-ReportBatchItemFailuresSampleDLQ:Type:AWS::SQS::QueueSampleQueue:Type:AWS::SQS::QueueProperties:VisibilityTimeout:30# Fn timeout * 6SqsManagedSseEnabled:trueRedrivePolicy:maxReceiveCount:2deadLetterTargetArn:!GetAttSampleDLQ.Arn
template.yaml
 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Description:partial batch response sampleGlobals:Function:Timeout:5MemorySize:256Runtime:nodejs22.xTracing:ActiveEnvironment:Variables:LOG_LEVEL:INFOPOWERTOOLS_SERVICE_NAME:helloResources:HelloWorldFunction:Type:AWS::Serverless::FunctionProperties:Handler:index.handlerCodeUri:hello_worldPolicies:# Lambda Destinations require additional permissions# to send failure records to DLQ from Kinesis/DynamoDB-Version:'2012-10-17'Statement:Effect:'Allow'Action:-sqs:GetQueueAttributes-sqs:GetQueueUrl-sqs:SendMessageResource:!GetAttSampleDLQ.ArnEvents:KinesisStream:Type:KinesisProperties:Stream:!GetAttSampleStream.ArnBatchSize:100StartingPosition:LATESTMaximumRetryAttempts:2DestinationConfig:OnFailure:Destination:!GetAttSampleDLQ.ArnFunctionResponseTypes:-ReportBatchItemFailuresSampleDLQ:Type:AWS::SQS::QueueSampleStream:Type:AWS::Kinesis::StreamProperties:ShardCount:1StreamEncryption:EncryptionType:KMSKeyId:alias/aws/kinesis
template.yaml
 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Description:partial batch response sampleGlobals:Function:Timeout:5MemorySize:256Runtime:nodejs22.xTracing:ActiveEnvironment:Variables:POWERTOOLS_LOG_LEVEL:INFOPOWERTOOLS_SERVICE_NAME:helloResources:HelloWorldFunction:Type:AWS::Serverless::FunctionProperties:Handler:index.handlerCodeUri:hello_worldPolicies:# Lambda Destinations require additional permissions# to send failure records from Kinesis/DynamoDB-Version:'2012-10-17'Statement:Effect:'Allow'Action:-sqs:GetQueueAttributes-sqs:GetQueueUrl-sqs:SendMessageResource:!GetAttSampleDLQ.ArnEvents:DynamoDBStream:Type:DynamoDBProperties:Stream:!GetAttSampleTable.StreamArnStartingPosition:LATESTMaximumRetryAttempts:2DestinationConfig:OnFailure:Destination:!GetAttSampleDLQ.ArnFunctionResponseTypes:-ReportBatchItemFailuresSampleDLQ:Type:AWS::SQS::QueueSampleTable:Type:AWS::DynamoDB::TableProperties:BillingMode:PAY_PER_REQUESTAttributeDefinitions:-AttributeName:pkAttributeType:S-AttributeName:skAttributeType:SKeySchema:-AttributeName:pkKeyType:HASH-AttributeName:skKeyType:RANGESSESpecification:SSEEnabled:trueStreamSpecification:StreamViewType:NEW_AND_OLD_IMAGES

Processing messages from SQS

Processing batches from SQS works in three stages:

  1. InstantiateBatchProcessor and chooseEventType.SQS for the event type
  2. Define your function to handle each batch record, and use theSQSRecord type annotation for autocompletion
  3. UseprocessPartialResponse to kick off processing

Note

By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set theprocessInParallel option tofalse, or useSqsFifoPartialProcessor for SQS FIFO queues.

 1 2 3 4 5 6 7 8 91011121314151617181920212223
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);// (1)!constlogger=newLogger();constrecordHandler=async(record:SQSRecord):Promise<void>=>{// (2)!constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{// (3)!context,});
  1. Step 1. Creates a partial failure batch processor for SQS queues. Seepartial failure mechanics for details
  2. Step 2. Defines a function to receive one record at a time from the batch
  3. Step 3. Kicks off processing

The second record failed to be processed, therefore the processor added its message ID in the response.

1234567
{"batchItemFailures":[{"itemIdentifier":"244fc6b4-87a3-44ab-83d2-361172410c3a"}]}
 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536
{"Records":[{"messageId":"059f36b4-87a3-44ab-83d2-661975830a7d","receiptHandle":"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a","body":"{\"Message\": \"success\"}","attributes":{"ApproximateReceiveCount":"1","SentTimestamp":"1545082649183","SenderId":"AIDAIENQZJOLO23YVJ4VO","ApproximateFirstReceiveTimestamp":"1545082649185"},"messageAttributes":{},"md5OfBody":"e4e68fb7bd0e697a0ae8f1bb342846b3","eventSource":"aws:sqs","eventSourceARN":"arn:aws:sqs:us-east-2: 123456789012:my-queue","awsRegion":"us-east-1"},{"messageId":"244fc6b4-87a3-44ab-83d2-361172410c3a","receiptHandle":"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a","body":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==","attributes":{"ApproximateReceiveCount":"1","SentTimestamp":"1545082649183","SenderId":"AIDAIENQZJOLO23YVJ4VO","ApproximateFirstReceiveTimestamp":"1545082649185"},"messageAttributes":{},"md5OfBody":"e4e68fb7bd0e697a0ae8f1bb342846b3","eventSource":"aws:sqs","eventSourceARN":"arn:aws:sqs:us-east-2: 123456789012:my-queue","awsRegion":"us-east-1"}]}

FIFO queues

When usingSQS FIFO queues, a batch may include messages from different group IDs.

By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.

Enable theskipGroupOnError option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.

 1 2 3 4 5 6 7 8 910111213141516171819202122
import{processPartialResponse,SqsFifoPartialProcessorAsync,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newSqsFifoPartialProcessorAsync();constlogger=newLogger();constrecordHandler=async(record:SQSRecord):Promise<void>=>{constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});
 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132
import{processPartialResponse,SqsFifoPartialProcessorAsync,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{Context,SQSBatchResponse,SQSEvent,SQSRecord,}from'aws-lambda';constprocessor=newSqsFifoPartialProcessorAsync();constlogger=newLogger();constrecordHandler=(record:SQSRecord):void=>{constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}};exportconsthandler=async(event:SQSEvent,context:Context):Promise<SQSBatchResponse>=>{returnprocessPartialResponse(event,recordHandler,processor,{context,skipGroupOnError:true,});};

Processing messages from Kinesis

Processing batches from Kinesis works in three stages:

  1. InstantiateBatchProcessor and chooseEventType.KinesisDataStreams for the event type
  2. Define your function to handle each batch record, and use theKinesisStreamRecord type annotation for autocompletion
  3. UseprocessPartialResponse to kick off processing
 1 2 3 4 5 6 7 8 9101112131415161718192021
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{KinesisStreamHandler,KinesisStreamRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.KinesisDataStreams);// (1)!constlogger=newLogger();constrecordHandler=async(record:KinesisStreamRecord):Promise<void>=>{logger.info('Processing record',{record:record.kinesis.data});constpayload=JSON.parse(record.kinesis.data);logger.info('Processed item',{item:payload});};exportconsthandler:KinesisStreamHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});
  1. Creates a partial failure batch processor for Kinesis Data Streams. Seepartial failure mechanics for details

The second record failed to be processed, therefore the processor added its sequence number in the response.

 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536
{"Records":[{"kinesis":{"kinesisSchemaVersion":"1.0","partitionKey":"1","sequenceNumber":"4107859083838847772757075850904226111829882106684065","data":"eyJNZXNzYWdlIjogInN1Y2Nlc3MifQ==","approximateArrivalTimestamp":1545084650.987},"eventSource":"aws:kinesis","eventVersion":"1.0","eventID":"shardId-000000000006:4107859083838847772757075850904226111829882106684065","eventName":"aws:kinesis:record","invokeIdentityArn":"arn:aws:iam::123456789012:role/lambda-role","awsRegion":"us-east-2","eventSourceARN":"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"},{"kinesis":{"kinesisSchemaVersion":"1.0","partitionKey":"1","sequenceNumber":"6006958808509702859251049540584488075644979031228738","data":"c3VjY2Vzcw==","approximateArrivalTimestamp":1545084650.987},"eventSource":"aws:kinesis","eventVersion":"1.0","eventID":"shardId-000000000006:6006958808509702859251049540584488075644979031228738","eventName":"aws:kinesis:record","invokeIdentityArn":"arn:aws:iam::123456789012:role/lambda-role","awsRegion":"us-east-2","eventSourceARN":"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"}]}
1234567
{"batchItemFailures":[{"itemIdentifier":"6006958808509702859251049540584488075644979031228738"}]}

Processing messages from DynamoDB

Processing batches from DynamoDB Streams works in three stages:

  1. InstantiateBatchProcessor and chooseEventType.DynamoDBStreams for the event type
  2. Define your function to handle each batch record, and use theDynamoDBRecord type annotation for autocompletion
  3. UseprocessPartialResponse to kick off processing
Info

This code example optionally uses Logger for completion.

 1 2 3 4 5 6 7 8 91011121314151617181920212223242526
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{DynamoDBRecord,DynamoDBStreamHandler}from'aws-lambda';constprocessor=newBatchProcessor(EventType.DynamoDBStreams);// (1)!constlogger=newLogger();constrecordHandler=async(record:DynamoDBRecord):Promise<void>=>{if(record.dynamodb?.NewImage){logger.info('Processing record',{record:record.dynamodb.NewImage});constmessage=record.dynamodb.NewImage.Message.S;if(message){constpayload=JSON.parse(message);logger.info('Processed item',{item:payload});}}};exportconsthandler:DynamoDBStreamHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});
  1. Creates a partial failure batch processor for DynamoDB Streams. Seepartial failure mechanics for details

The second record failed to be processed, therefore the processor added its sequence number in the response.

1234567
{"batchItemFailures":[{"itemIdentifier":"8640712661"}]}
 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435363738394041424344454647484950
{"Records":[{"eventID":"1","eventVersion":"1.0","dynamodb":{"Keys":{"Id":{"N":"101"}},"NewImage":{"Message":{"S":"failure"}},"StreamViewType":"NEW_AND_OLD_IMAGES","SequenceNumber":"3275880929","SizeBytes":26},"awsRegion":"us-west-2","eventName":"INSERT","eventSourceARN":"eventsource_arn","eventSource":"aws:dynamodb"},{"eventID":"1","eventVersion":"1.0","dynamodb":{"Keys":{"Id":{"N":"101"}},"NewImage":{"SomethingElse":{"S":"success"}},"StreamViewType":"NEW_AND_OLD_IMAGES","SequenceNumber":"8640712661","SizeBytes":26},"awsRegion":"us-west-2","eventName":"INSERT","eventSourceARN":"eventsource_arn","eventSource":"aws:dynamodb"}]}

Error handling

By default, we catch any exception raised by your record handler function. This allows us to(1) continue processing the batch,(2) collect each batch item that failed processing, and(3) return the appropriate response correctly without failing your Lambda function execution.

 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);constlogger=newLogger();classInvalidPayloadextendsError{publicconstructor(message:string){super(message);this.name='InvalidPayload';}}constrecordHandler=async(record:SQSRecord):Promise<void>=>{constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}else{thrownewInvalidPayload('Payload does not contain minimum required fields');// (1)!}};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{// (2)!context,});
  1. Any exception works here. SeeextendingBatchProcessor section, if you want to override this behavior.

  2. Errors raised inrecordHandler will propagate toprocessPartialResponse.

    We catch them and include each failed batch item identifier in the response object (seeSample response tab).

1234567
{"batchItemFailures":[{"itemIdentifier":"244fc6b4-87a3-44ab-83d2-361172410c3a"}]}

Partial failure mechanics

All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:

  • All records successfully processed. We will return an empty list of item failures{'batchItemFailures': []}
  • Partial success with some exceptions. We will return a list of all item IDs/sequence numbers that failed processing
  • All records failed to be processed. We will throw aFullBatchFailureError error with a list of all the errors thrown while processing unlessthrowOnFullBatchFailure is disabled.

The following sequence diagrams explain how each Batch processor behaves under different scenarios.

SQS Standard

Read more aboutBatch Failure Reporting feature in AWS Lambda.

Sequence diagram to explain howBatchProcessor works with SQS Standard queues.

sequenceDiagram    autonumber    participant SQS queue    participant Lambda service    participant Lambda function    Lambda service->>SQS queue: Poll    Lambda service->>Lambda function: Invoke (batch event)    Lambda function->>Lambda service: Report some failed messages    activate SQS queue    Lambda service->>SQS queue: Delete successful messages    SQS queue-->>SQS queue: Failed messages return    Note over SQS queue,Lambda service: Process repeat    deactivate SQS queue
SQS mechanism with Batch Item Failures

SQS FIFO

Read more aboutBatch Failure Reporting feature in AWS Lambda.

Sequence diagram to explain howSqsFifoPartialProcessor works with SQS FIFO queues withoutskipGroupOnError flag.

sequenceDiagram    autonumber    participant SQS queue    participant Lambda service    participant Lambda function    Lambda service->>SQS queue: Poll    Lambda service->>Lambda function: Invoke (batch event)    activate Lambda function    Lambda function-->Lambda function: Process 2 out of 10 batch items    Lambda function--xLambda function: Fail on 3rd batch item    Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure    deactivate Lambda function    activate SQS queue    Lambda service->>SQS queue: Delete successful messages (1-2)    SQS queue-->>SQS queue: Failed messages return (3-10)    deactivate SQS queue
SQS FIFO mechanism with Batch Item Failures

Sequence diagram to explain howSqsFifoPartialProcessor works with SQS FIFO queues withskipGroupOnError flag.

sequenceDiagram    autonumber    participant SQS queue    participant Lambda service    participant Lambda function    Lambda service->>SQS queue: Poll    Lambda service->>Lambda function: Invoke (batch event)    activate Lambda function    Lambda function-->Lambda function: Process 2 out of 10 batch items    Lambda function--xLambda function: Fail on 3rd batch item    Lambda function-->Lambda function: Process messages from another MessageGroupID    Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure    deactivate Lambda function    activate SQS queue    Lambda service->>SQS queue: Delete successful messages processed    SQS queue-->>SQS queue: Failed messages return    deactivate SQS queue
SQS FIFO mechanism with Batch Item Failures

Kinesis and DynamoDB Streams

Read more aboutBatch Failure Reporting feature.

Sequence diagram to explain howBatchProcessor works with bothKinesis Data Streams andDynamoDB Streams.

For brevity, we will useStreams to refer to either services. For theory on stream checkpoints, see thisblog post

sequenceDiagram    autonumber    participant Streams    participant Lambda service    participant Lambda function    Lambda service->>Streams: Poll latest records    Lambda service->>Lambda function: Invoke (batch event)    activate Lambda function    Lambda function-->Lambda function: Process 2 out of 10 batch items    Lambda function--xLambda function: Fail on 3rd batch item    Lambda function-->Lambda function: Continue processing batch items (4-10)    Lambda function->>Lambda service: Report batch item as failure (3)    deactivate Lambda function    activate Streams    Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item    Lambda service->>Streams: Poll records starting from updated checkpoint    deactivate Streams
Kinesis and DynamoDB streams mechanism with single batch item failure

The behavior changes slightly when there are multiple item failures. Stream checkpoint is updated to the lowest sequence number reported.

Note that the batch item sequence number could be different from batch item number in the illustration.

sequenceDiagram    autonumber    participant Streams    participant Lambda service    participant Lambda function    Lambda service->>Streams: Poll latest records    Lambda service->>Lambda function: Invoke (batch event)    activate Lambda function    Lambda function-->Lambda function: Process 2 out of 10 batch items    Lambda function--xLambda function: Fail on 3-5 batch items    Lambda function-->Lambda function: Continue processing batch items (6-10)    Lambda function->>Lambda service: Report batch items as failure (3-5)    deactivate Lambda function    activate Streams    Lambda service->>Streams: Checkpoints to lowest sequence number    Lambda service->>Streams: Poll records starting from updated checkpoint    deactivate Streams
Kinesis and DynamoDB streams mechanism with multiple batch item failures

Advanced

Parser integration

The Batch Processing utility integrates with theParser utility to automatically validate and parse each batch record before processing. This ensures your record handler receives properly typed and validated data, eliminating the need for manual parsing and validation.

To enable parser integration, import theparser function from@aws-lambda-powertools/batch/parser and pass it along with a schema when instantiating theBatchProcessor. This requires you to alsoinstall the Parser utility.

1
import{parser}from'@aws-lambda-powertools/batch/parser';

You have two approaches for schema validation:

  1. Item schema only (innerSchema) - Focus on your payload schema, we handle extending the base event structure
  2. Full event schema (schema) - Validate the entire event record structure with complete control

Benefits of parser integration

Parser integration eliminates runtime errors from malformed data and provides compile-time type safety, making your code more reliable and easier to maintain. Invalid records are automatically marked as failed and won't reach your handler, reducing defensive coding.

12345
constrecordHandler=async(record:SQSRecord)=>{// Manual parsing with no type safetyconstpayload=JSON.parse(record.body);// any typeconsole.log(payload.name);// No autocomplete, runtime errors possible};
123456
constmySchema=z.object({name:z.string(),age:z.number()});constrecordHandler=async(record:ParsedRecord<SQSRecord,z.infer<typeofmySchema>>)=>{// Automatic validation and strong typingconsole.log(record.body.name);// Full type safety and autocomplete};

Using item schema only

When you want to focus on validating your payload without dealing with the full event structure, useinnerSchema. We automatically extend the base event schema for you, reducing boilerplate code while still validating the entire record.

Available transformers by event type:

Event TypeBase SchemaAvailable TransformersWhen to use transformer
SQSSqsRecordSchemajson,base64json for stringified JSON,base64 for encoded data
KinesisKinesisDataStreamRecordbase64Required for Kinesis data (always base64 encoded)
DynamoDBDynamoDBStreamRecordunmarshallRequired to convert DynamoDB attribute values to plain objects
 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{parser}from'@aws-lambda-powertools/batch/parser';importtype{ParsedRecord}from'@aws-lambda-powertools/batch/types';import{Logger}from'@aws-lambda-powertools/logger';importtype{SQSHandler,SQSRecord}from'aws-lambda';import{z}from'zod';constmyItemSchema=z.object({name:z.string(),age:z.number(),});constlogger=newLogger();constprocessor=newBatchProcessor(EventType.SQS,{parser,innerSchema:myItemSchema,transformer:'json',logger,});constrecordHandler=async({messageId,body:{name,age},}:ParsedRecord<SQSRecord,z.infer<typeofmyItemSchema>>)=>{logger.info(`Processing record${messageId}`,{name,age});};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});
 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536
{"Records":[{"messageId":"059f36b4-87a3-44ab-83d2-661975830a7d","receiptHandle":"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a","body":"{\"name\": \"test-1\",\"age\": 20}","attributes":{"ApproximateReceiveCount":"1","SentTimestamp":"1545082649183","SenderId":"AIDAIENQZJOLO23YVJ4VO","ApproximateFirstReceiveTimestamp":"1545082649185"},"messageAttributes":{},"md5OfBody":"e4e68fb7bd0e697a0ae8f1bb342846b3","eventSource":"aws:sqs","eventSourceARN":"arn:aws:sqs:us-east-2: 123456789012:my-queue","awsRegion":"us-east-1"},{"messageId":"244fc6b4-87a3-44ab-83d2-361172410c3a","receiptHandle":"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a","body":"{\"name\": \"test-2\",\"age\": 30}","attributes":{"ApproximateReceiveCount":"1","SentTimestamp":"1545082649183","SenderId":"AIDAIENQZJOLO23YVJ4VO","ApproximateFirstReceiveTimestamp":"1545082649185"},"messageAttributes":{},"md5OfBody":"e4e68fb7bd0e697a0ae8f1bb342846b3","eventSource":"aws:sqs","eventSourceARN":"arn:aws:sqs:us-east-2: 123456789012:my-queue","awsRegion":"us-east-1"}]}

Note

IfinnerSchema is used with DynamoDB streams, the schema will be applied to both theNewImage and theOldImage by default. If you want to have dedicated schemas, see the section below.

Using full event schema

For complete control over validation, extend the built-in schemas with your custom payload schema. This approach gives you full control over the entire event structure.

 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435363738
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{parser}from'@aws-lambda-powertools/batch/parser';importtype{ParsedRecord}from'@aws-lambda-powertools/batch/types';import{Logger}from'@aws-lambda-powertools/logger';import{JSONStringified}from'@aws-lambda-powertools/parser/helpers';import{SqsRecordSchema}from'@aws-lambda-powertools/parser/schemas';importtype{SqsRecord}from'@aws-lambda-powertools/parser/types';importtype{SQSHandler}from'aws-lambda';import{z}from'zod';constmyItemSchema=JSONStringified(z.object({name:z.string(),age:z.number()}));constlogger=newLogger();constprocessor=newBatchProcessor(EventType.SQS,{parser,schema:SqsRecordSchema.extend({body:myItemSchema,}),logger,});constrecordHandler=async({messageId,body:{name,age},}:ParsedRecord<SqsRecord,z.infer<typeofmyItemSchema>>)=>{logger.info(`Processing record${messageId}`,{name,age});};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});
 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{parser}from'@aws-lambda-powertools/batch/parser';importtype{ParsedRecord}from'@aws-lambda-powertools/batch/types';import{Logger}from'@aws-lambda-powertools/logger';import{Base64Encoded}from'@aws-lambda-powertools/parser/helpers';import{KinesisDataStreamRecord,KinesisDataStreamRecordPayload,}from'@aws-lambda-powertools/parser/schemas/kinesis';importtype{KinesisDataStreamRecordEvent}from'@aws-lambda-powertools/parser/types';importtype{KinesisStreamHandler}from'aws-lambda';import{z}from'zod';constmyItemSchema=Base64Encoded(z.object({name:z.string(),age:z.number(),}));constlogger=newLogger();constprocessor=newBatchProcessor(EventType.KinesisDataStreams,{parser,schema:KinesisDataStreamRecord.extend({kinesis:KinesisDataStreamRecordPayload.extend({data:myItemSchema,}),}),logger,});constrecordHandler=async({kinesis:{sequenceNumber,data:{name,age},},}:ParsedRecord<KinesisDataStreamRecordEvent,z.infer<typeofmyItemSchema>>)=>{logger.info(`Processing record:${sequenceNumber}`,{name,age,});};exportconsthandler:KinesisStreamHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});
 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435363738394041424344
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{parser}from'@aws-lambda-powertools/batch/parser';importtype{ParsedRecord}from'@aws-lambda-powertools/batch/types';import{Logger}from'@aws-lambda-powertools/logger';import{DynamoDBMarshalled}from'@aws-lambda-powertools/parser/helpers/dynamodb';import{DynamoDBStreamChangeRecordBase,DynamoDBStreamRecord,}from'@aws-lambda-powertools/parser/schemas/dynamodb';importtype{DynamoDBRecord,DynamoDBStreamHandler}from'aws-lambda';import{z}from'zod';constmyItemSchema=DynamoDBMarshalled(z.object({name:z.string(),age:z.number()}));constlogger=newLogger();constprocessor=newBatchProcessor(EventType.SQS,{parser,schema:DynamoDBStreamRecord.extend({dynamodb:DynamoDBStreamChangeRecordBase.extend({NewImage:myItemSchema,}),}),logger,});constrecordHandler=async({eventID,dynamodb:{NewImage:{name,age},},}:ParsedRecord<DynamoDBRecord,z.infer<typeofmyItemSchema>>)=>{logger.info(`Processing record${eventID}`,{name,age});};exportconsthandler:DynamoDBStreamHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});

Typed record handlers with ParsedRecord

To get full type safety in your record handlers, use theParsedRecord utility type:

1234567
importtype{ParsedRecord}from'@aws-lambda-powertools/batch';// For most cases - single schematypeMyRecord=ParsedRecord<SQSRecord,z.infer<typeofmySchema>>;// For DynamoDB - separate schemas for NewImage and OldImagetypeMyDynamoRecord=ParsedRecord<DynamoDBRecord,z.infer<typeofnewSchema>,z.infer<typeofoldSchema>>;

This eliminates verbose type annotations and provides clean autocompletion for your parsed data.

Accessing processed messages

Use theBatchProcessor directly in your function to access a list of all returned values from yourrecordHandler function.

  • When successful. We will include a tuple withsuccess, the result ofrecordHandler, and the batch record
  • When failed. We will include a tuple withfail, exception as a string, and the batch record
Accessing processed messages
 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829
import{BatchProcessor,EventType}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);constlogger=newLogger();constrecordHandler=(record:SQSRecord):void=>{constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}};exportconsthandler:SQSHandler=async(event,context)=>{constbatch=event.Records;// (1)!processor.register(batch,recordHandler,{context});// (2)!constprocessedMessages=awaitprocessor.process();for(constmessageofprocessedMessages){const[status,error,record]=message;logger.info('Processed record',{status,record,error});}returnprocessor.response();};
  1. The processor requires the records array. This is typically handled byprocessPartialResponse.
  2. You need to register thebatch, therecordHandler function, and optionally thecontext to access the Lambda context.

Accessing Lambda Context

Within yourrecordHandler function, you might need access to the Lambda context to determine how much time you have left before your function times out.

We can automatically inject theLambda context into yourrecordHandler as optional second argument if you pass it to theprocessPartialResponse function.

 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{Context,SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);constlogger=newLogger();constrecordHandler=(record:SQSRecord,lambdaContext?:Context):void=>{constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}if(lambdaContext){logger.info('Remaining time',{time:lambdaContext.getRemainingTimeInMillis(),});}};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});

Working with full batch failures

By default, theBatchProcessor will throw aFullBatchFailureError if all records in the batch fail to process, we do this to reflect the failure in your operational metrics.

When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, theLambda service will scale down the concurrency of your function, potentially impacting performance.

For these scenarios, you can set thethrowOnFullBatchFailure option tofalse when calling.

 1 2 3 4 5 6 7 8 9101112131415161718
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);constrecordHandler=async(_record:SQSRecord):Promise<void>=>{// Process the record};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,throwOnFullBatchFailure:false,});

Extending BatchProcessor

You might want to bring custom logic to the existingBatchProcessor to slightly override how we handle successes and failures.

For these scenarios, you can subclassBatchProcessor and quickly overridesuccessHandler andfailureHandler methods:

  • successHandler() – Keeps track of successful batch records
  • failureHandler() – Keeps track of failed batch records

Let's suppose you'd like to add a metric namedBatchRecordFailures for each batch record that failed processing

Extending failure handling mechanism in BatchProcessor
 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';importtype{EventSourceDataClassTypes,FailureResponse,}from'@aws-lambda-powertools/batch/types';import{Logger}from'@aws-lambda-powertools/logger';import{Metrics,MetricUnit}from'@aws-lambda-powertools/metrics';importtype{SQSHandler,SQSRecord}from'aws-lambda';classMyProcessorextendsBatchProcessor{#metrics:Metrics;publicconstructor(eventType:keyoftypeofEventType){super(eventType);this.#metrics=newMetrics({namespace:'test'});}publicfailureHandler(record:EventSourceDataClassTypes,error:Error):FailureResponse{this.#metrics.addMetric('BatchRecordFailures',MetricUnit.Count,1);returnsuper.failureHandler(record,error);}}constprocessor=newMyProcessor(EventType.SQS);constlogger=newLogger();constrecordHandler=(record:SQSRecord):void=>{constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});

Sequential processing

By default, theBatchProcessor processes records in parallel usingPromise.all(). However, if you need to preserve the order of records, you can set theprocessInParallel option tofalse to process records sequentially.

If theprocessInParallel option is not provided, theBatchProcessor will process records in parallel.

When processing records from SQS FIFO queues, we recommend using theSqsFifoPartialProcessor class, which guarantees ordering of records and implements a short-circuit mechanism to skip processing records from a different message group ID.

Sequential processing
 1 2 3 4 5 6 7 8 9101112131415161718
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);constrecordHandler=async(_record:SQSRecord):Promise<void>=>{// Process the record};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,processInParallel:false,});

Create your own partial processor

You can create your own partial batch processor from scratch by inheriting theBasePartialProcessor class, and implementing theprepare(),clean(),processRecord() andprocessRecordSync() abstract methods.

classDiagram    direction LR    class BasePartialProcessor {        <<interface>>        +prepare()        +clean()        +processRecord(record: BaseRecord)        +processRecordSync(record: BaseRecord)    }    class YourCustomProcessor {        +prepare()        +clean()        +processRecord(record: BaseRecord)        +processRecordSyc(record: BaseRecord)    }    BasePartialProcessor <|-- YourCustomProcessor : extends
Visual representation to bring your own processor

  • prepare() – called once as part of the processor initialization
  • clean() – teardown logic called once afterprocessRecord completes
  • processRecord() – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
  • processRecordSync() – handles all processing logic for each individual message of a batch, including calling therecordHandler (this.handler)

You can then pass this class toprocessPartialResponse to process the records in your Lambda handler function.

Creating a custom batch processor
 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
import{randomInt}from'node:crypto';import{BasePartialBatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';importtype{BaseRecord,FailureResponse,SuccessResponse,}from'@aws-lambda-powertools/batch/types';import{BatchWriteItemCommand,DynamoDBClient,}from'@aws-sdk/client-dynamodb';import{marshall}from'@aws-sdk/util-dynamodb';importtype{SQSHandler}from'aws-lambda';consttableName=process.env.TABLE_NAME||'table-not-found';classMyPartialProcessorextendsBasePartialBatchProcessor{#tableName:string;#client?:DynamoDBClient;publicconstructor(tableName:string){super(EventType.SQS);this.#tableName=tableName;}/**   * It's called once, **after** processing the batch.   *   * Here we are writing all the processed messages to DynamoDB.   */publicclean():void{// biome-ignore lint/style/noNonNullAssertion: We know that the client is defined because clean() is called after prepare()this.#client!.send(newBatchWriteItemCommand({RequestItems:{[this.#tableName]:this.successMessages.map((message)=>({PutRequest:{Item:marshall(message),},})),},}));}/**   * It's called once, **before** processing the batch.   *   * It initializes a new client and cleans up any existing data.   */publicprepare():void{this.#client=newDynamoDBClient({});this.successMessages=[];}publicasyncprocessRecord(_record:BaseRecord):Promise<SuccessResponse|FailureResponse>{thrownewError('Not implemented');}/**   * It handles how your record is processed.   *   * Here we are keeping the status of each run, `this.handler` is   * the function that is passed when calling `processor.register()`.   */publicprocessRecordSync(record:BaseRecord):SuccessResponse|FailureResponse{try{constresult=this.handler(record);returnthis.successHandler(record,result);}catch(error){returnthis.failureHandler(record,errorasError);}}}constprocessor=newMyPartialProcessor(tableName);constrecordHandler=():number=>{returnMath.floor(randomInt(1,10));};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});

Tracing with AWS X-Ray

You can use Tracer to create subsegments for each batch record processed. To do so, you can open a new subsegment for each record, and close it when you're done processing it. When adding annotations and metadata to the subsegment, you can do so directly without callingtracer.setSegment(subsegment). This allows you to work with the subsegment directly and avoid having to either pass the parent subsegment around or have to restore the parent subsegment at the end of the record processing.

 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Tracer}from'@aws-lambda-powertools/tracer';import{captureLambdaHandler}from'@aws-lambda-powertools/tracer/middleware';importmiddyfrom'@middy/core';importtype{SQSEvent,SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);consttracer=newTracer({serviceName:'serverlessAirline'});constrecordHandler=async(record:SQSRecord):Promise<void>=>{constsubsegment=tracer.getSegment()?.addNewSubsegment('### recordHandler');// (1)!subsegment?.addAnnotation('messageId',record.messageId);// (2)!constpayload=record.body;if(payload){try{constitem=JSON.parse(payload);// do something with the itemsubsegment?.addMetadata('item',item);}catch(error){subsegment?.addError(errorasError);throwerror;}}subsegment?.close();// (3)!};exportconsthandler:SQSHandler=middy(async(event:SQSEvent,context)=>processPartialResponse(event,recordHandler,processor,{context,})).use(captureLambdaHandler(tracer));
  1. Retrieve the current segment, then create a subsegment for the record being processed
  2. You can add annotations and metadata to the subsegment directly without callingtracer.setSegment(subsegment)
  3. Close the subsegment when you're done processing the record

Testing your code

As there is no external calls, you can unit test your code withBatchProcessor quite easily.

Example:

Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response.

 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546
import{readFileSync}from'node:fs';import{describe,expect,it}from'vitest';import{handler,processor}from'./gettingStartedSQS.js';constcontext={callbackWaitsForEmptyEventLoop:true,functionVersion:'$LATEST',functionName:'foo-bar-function',memoryLimitInMB:'128',logGroupName:'/aws/lambda/foo-bar-function-123456abcdef',logStreamName:'2021/03/09/[$LATEST]abcdef123456abcdef123456abcdef123456',invokedFunctionArn:'arn:aws:lambda:eu-west-1:123456789012:function:foo-bar-function',awsRequestId:'c6af9ac6-7b61-11e6-9a41-93e812345678',getRemainingTimeInMillis:()=>1234,done:()=>console.log('Done!'),fail:()=>console.log('Failed!'),succeed:()=>console.log('Succeeded!'),};describe('Function tests',()=>{it('returns one failed message',async()=>{// Prepareconstevent=JSON.parse(readFileSync('./samples/sampleSQSEvent.json','utf8'));constprocessorResult=processor;// access processor for additional assertionsconstsuccessfulRecord=event.Records[0];constfailedRecord=event.Records[1];constexpectedResponse={batchItemFailures:[{itemIdentifier:failedRecord.messageId,},],};// Actconstresponse=awaithandler(event,context,()=>{});// Assessexpect(response).toEqual(expectedResponse);expect(processorResult.failureMessages).toHaveLength(1);expect(processorResult.successMessages[0]).toEqual(successfulRecord);});});
 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);// (1)!constlogger=newLogger();// biome-ignore format: we need the comment in the next line to stay there to annotate the code snippet in the docsconstrecordHandler=async(record:SQSRecord):Promise<void>=>{// (2)!constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}};exportconsthandler:SQSHandler=async(event,context)=>// biome-ignore format: we need the comment in the next line to stay there to annotate the code snippet in the docsprocessPartialResponse(event,recordHandler,processor,{// (3)!context,});export{processor};
events/sqs_event.json
 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536
{"Records":[{"messageId":"059f36b4-87a3-44ab-83d2-661975830a7d","receiptHandle":"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a","body":"{\"Message\": \"success\"}","attributes":{"ApproximateReceiveCount":"1","SentTimestamp":"1545082649183","SenderId":"AIDAIENQZJOLO23YVJ4VO","ApproximateFirstReceiveTimestamp":"1545082649185"},"messageAttributes":{},"md5OfBody":"e4e68fb7bd0e697a0ae8f1bb342846b3","eventSource":"aws:sqs","eventSourceARN":"arn:aws:sqs:us-east-2: 123456789012:my-queue","awsRegion":"us-east-1"},{"messageId":"244fc6b4-87a3-44ab-83d2-361172410c3a","receiptHandle":"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a","body":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==","attributes":{"ApproximateReceiveCount":"1","SentTimestamp":"1545082649183","SenderId":"AIDAIENQZJOLO23YVJ4VO","ApproximateFirstReceiveTimestamp":"1545082649185"},"messageAttributes":{},"md5OfBody":"e4e68fb7bd0e697a0ae8f1bb342846b3","eventSource":"aws:sqs","eventSourceARN":"arn:aws:sqs:us-east-2: 123456789012:my-queue","awsRegion":"us-east-1"}]}
2025-10-23

[8]ページ先頭

©2009-2025 Movatter.jp