Last active
August 26, 2022 14:05
-
-
Save danielzurawski/086fac69620f263bea0c97356102e075 to your computer and use it in GitHub Desktop.
DynamoDB batch update using Bluebird for async concurrency and a rate limiter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
A small node.js script utilising bluebird, node-rate-limiter and aws-sdk to achieve optimal DynamoDB UpdateItem throughput for a provided DDB write capacity / per second | |
When would this be useful? | |
Sadly, aws-sdk does not automatically throttle, it just blows up with ProvisionedThroughputExceededException. | |
So let’s say, you want to update millions of records in Dynamo using the Update operation, without BatchWriteItem which only allows Puts and you want to avoid generating Insert events. | |
Additionally, you want it to go as fast as your DDB configuration (without burst) allows, but not any faster. | |
*/ | |
const Promise = require('bluebird') | |
const aws = require('aws-sdk') | |
const documentClient = new aws.DynamoDB.DocumentClient({ region: 'us-east-1' }) | |
const RateLimiter = require('limiter').RateLimiter | |
const ddbWriteCapacityUnitPerSecond = 10; | |
const limiter = new RateLimiter({ tokensPerInterval: ddbWriteCapacityUnitPerSecond, interval: 1000 }) | |
const table = 'some-ddb-table-name' | |
const printProgress = (progress, remainingRequests) => { | |
let tokensMsg = '' | |
if (Math.floor(remainingRequests) > 0) { | |
tokensMsg = `(Under utilised, available rate limit tokens ${remainingRequests})` | |
} else if (remainingRequests < 0) { | |
tokensMsg = `(Rate limited, available rate limit tokens ${remainingRequests})` | |
} else { | |
tokensMsg = `(Spot on, available rate limit tokens ${remainingRequests})` | |
} | |
process.stdout.clearLine() | |
process.stdout.cursorTo(0) | |
process.stdout.write(progress + " " + tokensMsg) | |
} | |
async function updateItem(item, itemCounter, totalItems) { | |
const remainingRequests = await limiter.removeTokens(1) | |
printProgress(`Progress: ${itemCounter}/${totalItems}`, remainingRequests) | |
await documentClient | |
.update({ | |
TableName: table, | |
Key: { s3path: item.s3path }, | |
UpdateExpression: 'SET #propName = :propName', | |
ExpressionAttributeNames: { | |
'#propName': 'propName', | |
}, | |
ExpressionAttributeValues: { | |
':propName': 'Some New Prop Value', | |
}, | |
}) | |
.promise(); | |
} | |
const throttledDynamoDBUpdate = async () => { | |
let itemCounter = 0; | |
const { Items = [] } = await documentClient | |
.scan({ | |
TableName: table | |
}) | |
.promise(); | |
await Promise.map(Items, async (item) => { | |
itemCounter++; | |
return updateItem(item, itemCounter, Items.length) | |
}, { concurrency: ddbWriteCapacityUnitPerSecond }) | |
}; | |
throttledDynamoDBUpdate(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment