Skip to content

Instantly share code, notes, and snippets.

@amiantos
Last active May 23, 2024 21:44
Show Gist options
  • Save amiantos/16bacc9ed742c91151fcf1a41012445e to your computer and use it in GitHub Desktop.
Save amiantos/16bacc9ed742c91151fcf1a41012445e to your computer and use it in GitHub Desktop.
Zip Multiple Files from S3 using AWS Lambda Function
// Lambda S3 Zipper
// http://amiantos.net/zip-multiple-files-on-aws-s3/
//
// Accepts a bundle of data in the format...
// {
// "bucket": "your-bucket",
// "destination_key": "zips/test.zip",
// "files": [
// {
// "uri": "...", (options: S3 file key or URL)
// "filename": "...", (filename of file inside zip)
// "type": "..." (options: [file, url])
// }
// ]
// }
// Saves zip file at "destination_key" location
"use strict";
const AWS = require("aws-sdk");
const awsOptions = {
region: "us-east-1",
httpOptions: {
timeout: 300000 // Matching Lambda function timeout
}
};
const s3 = new AWS.S3(awsOptions);
const archiver = require("archiver");
const stream = require("stream");
const request = require("request");
const streamTo = (bucket, key, resolve) => {
var passthrough = new stream.PassThrough();
s3.upload(
{
Bucket: bucket,
Key: key,
Body: passthrough,
ContentType: "application/zip",
ServerSideEncryption: "AES256"
},
(err, data) => {
if (err) throw err;
console.log("Zip uploaded");
resolve();
}
).on("httpUploadProgress", (progress) => {
console.log(progress);
});
return passthrough;
};
// Kudos to this person on GitHub for this getStream solution
// https://github.com/aws/aws-sdk-js/issues/2087#issuecomment-474722151
const getStream = (bucket, key) => {
let streamCreated = false;
const passThroughStream = new stream.PassThrough();
passThroughStream.on("newListener", event => {
if (!streamCreated && event == "data") {
const s3Stream = s3
.getObject({ Bucket: bucket, Key: key })
.createReadStream();
s3Stream
.on("error", err => passThroughStream.emit("error", err))
.pipe(passThroughStream);
streamCreated = true;
}
});
return passThroughStream;
};
exports.handler = async (event, context, callback) => {
var bucket = event["bucket"];
var destinationKey = event["destination_key"];
var files = event["files"];
await new Promise(async (resolve, reject) => {
var zipStream = streamTo(bucket, destinationKey, resolve);
zipStream.on("error", reject);
var archive = archiver("zip");
archive.on("error", err => {
throw new Error(err);
});
archive.pipe(zipStream);
for (const file of files) {
if (file["type"] == "file") {
archive.append(getStream(bucket, file["uri"]), {
name: file["filename"]
});
} else if (file["type"] == "url") {
archive.append(request(file["uri"]), { name: file["filename"] });
}
}
archive.finalize();
}).catch(err => {
throw new Error(err);
});
callback(null, {
statusCode: 200,
body: { final_destination: destinationKey }
});
};
@RyanClementsHax
Copy link

@RyanClementsHax thanks! while we weren't running into this issue in production (how? why?) I went ahead and implemented your changes on our side since they make sense to me, and I also updated the gist itself with your changes. Thanks again for the extra legwork on diagnosing this issue.

no problem!

@DDynamic
Copy link

@amiantos with this script, do you think there is an easy way to introduce concurrent stream processing? It appears that the read streams are created and processed sequentially. I'm testing this out with zipping over 10,000 small files (~50 KB each).

@amiantos
Copy link
Author

@DDynamic I'm no expert but I assume you can't add multiple files to a zip file at the same time. I did a little googling, looks like this assumption is correct, the zip algorithm needs to process one stream at a time.

@RyanClementsHax
Copy link

RyanClementsHax commented Jul 6, 2021

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

@damianobertuna
Copy link

Hi,

we tried with the solution suggested here but we are facing the following problem.
Suppose we want to zip these files:

{
"files": [
    {
      "fileName": "File1_1GB.bin",
      "key": "File1_1GB.bin"
    },
    {
      "fileName": "File2_1GB.bin",
      "key": "File2_1GB.bin"
    },
    {
      "fileName": "File3_1GB.bin",
      "key": "File3_1GB.bin"
    },
    {
      "fileName": "File4_1GB.bin",
      "key": "File4_1GB.bin"
    },
    {
      "fileName": "File5_1GB.bin",
      "key": "File5_1GB.bin"
    },
],
  "bucketRegion": "REGION_NAME",
  "originBucketName": "BUCKET_NAME",
  "destBucketName": "DESTBUCKET",
  "zipName": "ZippedFiles.zip"
}

In the ZippedFiles.zip created we have correctly 5 files but they are not of the correct size, like:

  • File1 1GB;
  • File2 1GB;
  • File3 1GB;
  • File4 34KB;
  • File5 34KB;

Our configuration is 15 minutes the timeout and 10GB the memory.

What can be the problem?

Thanks in advance.

Reagards.

@RyanClementsHax
Copy link

RyanClementsHax commented Jul 30, 2021

Hi,

we tried with the solution suggested here but we are facing the following problem.
Suppose we want to zip these files:

{
"files": [
    {
      "fileName": "File1_1GB.bin",
      "key": "File1_1GB.bin"
    },
    {
      "fileName": "File2_1GB.bin",
      "key": "File2_1GB.bin"
    },
    {
      "fileName": "File3_1GB.bin",
      "key": "File3_1GB.bin"
    },
    {
      "fileName": "File4_1GB.bin",
      "key": "File4_1GB.bin"
    },
    {
      "fileName": "File5_1GB.bin",
      "key": "File5_1GB.bin"
    },
],
  "bucketRegion": "REGION_NAME",
  "originBucketName": "BUCKET_NAME",
  "destBucketName": "DESTBUCKET",
  "zipName": "ZippedFiles.zip"
}

In the ZippedFiles.zip created we have correctly 5 files but they are not of the correct size, like:

  • File1 1GB;
  • File2 1GB;
  • File3 1GB;
  • File4 34KB;
  • File5 34KB;

Our configuration is 15 minutes the timeout and 10GB the memory.

What can be the problem?

Thanks in advance.

Reagards.

Hello! Which version of the solution are you using? Also have you added additional logging to catch errors? Your type of problem happened to me whenever the stream was not finished writing for some reason normally caused when the server encountered an unexpected error. Moreover, how long was your lambda running for?

@Trigno97
Copy link

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

🏆

@dakebusi
Copy link

@damianobertuna I'm facing a similar issue, where not all files are correctly zipped.

Did you manage to fix it?

@mikehadlow
Copy link

Just wanted to leave a shoutout to @amiantos and @RyanClementsHax for his Typescript solution. Works a dream and saved me a ton of time. Thank you both!

@Limesior
Copy link

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

Thank you so much ! I had an issue where my lambda would die on larger/multiple files without throwing any errors with Archiver and SDK v3, this was the fix. Great work !

@amiantos
Copy link
Author

Really happy to see how this gist has grown and evolved over the years. Kudos to everyone who has weighed in, offered more code and advice!

@julianpoma
Copy link

I was struggling with this today. Glad to know that I am not alone <3

@Sahar-SE
Copy link

Sahar-SE commented Oct 9, 2023

Hi, I need someone to help me please. I have created a Lambda function to trigger S3 bucket and zip the uploaded files then store them in a destination bucket. but when I upload files it doesn't appear to destination bucket. I have searched a lot but couldn't find any solution for that. I configured all the permission and IAM but still it doesn't work.

@pnicholls
Copy link

pnicholls commented Apr 23, 2024

zipkit.io is another way to solve this problem.

@marc-reed
Copy link

Really happy to see how this gist has grown and evolved over the years. Kudos to everyone who has weighed in, offered more code and advice!

I am going to join in and sing praises for @amiantos - you saved my butt today!
I had to write a Lambda to zip thousands of files. The task would just go dark - no zip, no errors. Your V3 script did the trick - well done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment