Files
pis-data-ingress/src/sss.ts

60 lines
1.9 KiB
TypeScript

import { S3Client } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { createWriteStream } from "node:fs";
import { Readable } from "node:stream";
import { DataIngressPisData } from "@owlboard/backend-data-contracts";
import { log } from "./logger.js";
import type { S3 } from "./config.js";
export async function processAndStore(
cfg: S3,
generator: AsyncGenerator<DataIngressPisData.PisObjects>,
filename: string
) {
const ndjsonStream = Readable.from((async function* () {
for await (const record of generator) {
yield JSON.stringify(record) + "\n";
}
})());
const useS3 = cfg.S3_ENDPOINT && cfg.S3_BUCKET;
if (useS3) {
if (!cfg.S3_ENDPOINT || !cfg.S3_BUCKET || !cfg.S3_REGION || !cfg.S3_ACCESS_KEY || !cfg.S3_SECRET_KEY) {
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
process.exit(24);
}
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
const client = new S3Client({
endpoint: cfg.S3_ENDPOINT,
region: cfg.S3_REGION,
credentials: {
accessKeyId: cfg.S3_ACCESS_KEY!,
secretAccessKey: cfg.S3_SECRET_KEY,
},
forcePathStyle: true,
});
const upload = new Upload({
client,
params: {
Bucket: cfg.S3_BUCKET,
Key: filename,
Body: ndjsonStream,
ContentType: "application/x-ndjson",
},
});
await upload.done();
} else {
log('INFO', `Streaming to local filesystem at: ${filename}`);
const fileWriter = createWriteStream(filename);
return new Promise((resolve, reject) => {
ndjsonStream.pipe(fileWriter)
.on('finish', resolve)
.on('error', reject);
})
}
}