Files
pis-data-ingress/src/sss.ts

76 lines
2.6 KiB
TypeScript

import { S3Client, CreateBucketCommand, HeadBucketCommand, S3ServiceException } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { createWriteStream } from "node:fs";
import { Readable } from "node:stream";
import { DataIngressPisData } from "@owlboard/backend-data-contracts";
import { log } from "./logger.js";
import type { S3 } from "./config.js";
export async function processAndStore(
cfg: S3,
generator: AsyncGenerator<DataIngressPisData.PisObjects>,
filename: string
) {
const ndjsonStream = Readable.from((async function* () {
for await (const record of generator) {
yield JSON.stringify(record) + "\n";
}
})());
const useS3 = cfg.S3_ENDPOINT && cfg.S3_BUCKET;
if (useS3) {
if (!cfg.S3_ENDPOINT || !cfg.S3_BUCKET || !cfg.S3_REGION || !cfg.S3_ACCESS_KEY || !cfg.S3_SECRET_KEY) {
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
process.exit(24);
}
log("DEBUG", "Opening connection to S3 Server");
const client = new S3Client({
endpoint: cfg.S3_ENDPOINT,
region: cfg.S3_REGION,
credentials: {
accessKeyId: cfg.S3_ACCESS_KEY!,
secretAccessKey: cfg.S3_SECRET_KEY,
},
forcePathStyle: true,
});
// Check bucket exists, and create if needed
try {
await client.send(new HeadBucketCommand({ Bucket: cfg.S3_BUCKET }));
} catch (err) {
if (err instanceof S3ServiceException && err.$metadata.httpStatusCode === 404) {
log('INFO', `Bucket ${cfg.S3_BUCKET} does not exist, creating...`);
await client.send(new CreateBucketCommand({ Bucket: cfg.S3_BUCKET }));
} else {
log(`ERROR`, `Failed to verify bucket: ${cfg.S3_BUCKET}`);
process.exit(21);
}
}
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
const upload = new Upload({
client,
params: {
Bucket: cfg.S3_BUCKET,
Key: filename,
Body: ndjsonStream,
ContentType: "application/x-ndjson",
},
});
await upload.done();
} else {
log('INFO', `Streaming to local filesystem at: ${filename}`);
const fileWriter = createWriteStream(filename);
return new Promise((resolve, reject) => {
ndjsonStream.pipe(fileWriter)
.on('finish', resolve)
.on('error', reject);
})
}
}