Skip to content

Instantly share code, notes, and snippets.

@danielzurawski
Last active August 26, 2022 14:05
Show Gist options
  • Save danielzurawski/086fac69620f263bea0c97356102e075 to your computer and use it in GitHub Desktop.
Save danielzurawski/086fac69620f263bea0c97356102e075 to your computer and use it in GitHub Desktop.
DynamoDB batch update using Bluebird for async concurrency and a rate limiter
/*
A small node.js script utilising bluebird, node-rate-limiter and aws-sdk to achieve optimal DynamoDB UpdateItem throughput for a provided DDB write capacity / per second
When would this be useful?
Sadly, aws-sdk does not automatically throttle, it just blows up with ProvisionedThroughputExceededException.
So let’s say, you want to update millions of records in Dynamo using the Update operation, without BatchWriteItem which only allows Puts and you want to avoid generating Insert events.
Additionally, you want it to go as fast as your DDB configuration (without burst) allows, but not any faster.
*/
const Promise = require('bluebird')
const aws = require('aws-sdk')
const documentClient = new aws.DynamoDB.DocumentClient({ region: 'us-east-1' })
const RateLimiter = require('limiter').RateLimiter
const ddbWriteCapacityUnitPerSecond = 10;
const limiter = new RateLimiter({ tokensPerInterval: ddbWriteCapacityUnitPerSecond, interval: 1000 })
const table = 'some-ddb-table-name'
const printProgress = (progress, remainingRequests) => {
let tokensMsg = ''
if (Math.floor(remainingRequests) > 0) {
tokensMsg = `(Under utilised, available rate limit tokens ${remainingRequests})`
} else if (remainingRequests < 0) {
tokensMsg = `(Rate limited, available rate limit tokens ${remainingRequests})`
} else {
tokensMsg = `(Spot on, available rate limit tokens ${remainingRequests})`
}
process.stdout.clearLine()
process.stdout.cursorTo(0)
process.stdout.write(progress + " " + tokensMsg)
}
async function updateItem(item, itemCounter, totalItems) {
const remainingRequests = await limiter.removeTokens(1)
printProgress(`Progress: ${itemCounter}/${totalItems}`, remainingRequests)
await documentClient
.update({
TableName: table,
Key: { s3path: item.s3path },
UpdateExpression: 'SET #propName = :propName',
ExpressionAttributeNames: {
'#propName': 'propName',
},
ExpressionAttributeValues: {
':propName': 'Some New Prop Value',
},
})
.promise();
}
const throttledDynamoDBUpdate = async () => {
let itemCounter = 0;
const { Items = [] } = await documentClient
.scan({
TableName: table
})
.promise();
await Promise.map(Items, async (item) => {
itemCounter++;
return updateItem(item, itemCounter, Items.length)
}, { concurrency: ddbWriteCapacityUnitPerSecond })
};
throttledDynamoDBUpdate();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment