import { S3Client, CreateBucketCommand, HeadBucketCommand, S3ServiceException } from "@aws-sdk/client-s3"; import { Upload } from "@aws-sdk/lib-storage"; import { createWriteStream } from "node:fs"; import { Readable } from "node:stream"; import { DataIngressPisData } from "@owlboard/backend-data-contracts"; import { log } from "./logger.js"; import type { S3 } from "./config.js"; export async function processAndStore( cfg: S3, generator: AsyncGenerator, filename: string ) { const ndjsonStream = Readable.from((async function* () { for await (const record of generator) { yield JSON.stringify(record) + "\n"; } })()); const useS3 = cfg.S3_ENDPOINT && cfg.S3_BUCKET; if (useS3) { if (!cfg.S3_ENDPOINT || !cfg.S3_BUCKET || !cfg.S3_REGION || !cfg.S3_ACCESS_KEY || !cfg.S3_SECRET_KEY) { log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24"); process.exit(24); } log("DEBUG", "Opening connection to S3 Server"); const client = new S3Client({ endpoint: cfg.S3_ENDPOINT, region: cfg.S3_REGION, credentials: { accessKeyId: cfg.S3_ACCESS_KEY!, secretAccessKey: cfg.S3_SECRET_KEY, }, forcePathStyle: true, }); // Check bucket exists, and create if needed try { await client.send(new HeadBucketCommand({ Bucket: cfg.S3_BUCKET })); } catch (err) { if (err instanceof S3ServiceException && err.$metadata.httpStatusCode === 404) { log('INFO', `Bucket ${cfg.S3_BUCKET} does not exist, creating...`); await client.send(new CreateBucketCommand({ Bucket: cfg.S3_BUCKET })); } else { log(`ERROR`, `Failed to verify bucket: ${cfg.S3_BUCKET}`); process.exit(21); } } log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`); const upload = new Upload({ client, params: { Bucket: cfg.S3_BUCKET, Key: filename, Body: ndjsonStream, ContentType: "application/x-ndjson", }, }); await upload.done(); } else { log('INFO', `Streaming to local filesystem at: ${filename}`); const fileWriter = createWriteStream(filename); return new Promise((resolve, reject) => { ndjsonStream.pipe(fileWriter) .on('finish', resolve) .on('error', reject); }) } }