Files
pis-data-ingress/src/index.ts

52 lines
1.7 KiB
TypeScript

import { log } from './logger.js'
import { Readable } from 'node:stream'
import { processAndStore } from './sss.js'
import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
import { processPisStream } from './process.js'
import { ConfigLoader } from './config.js'
import { sendFileUpdateMessage } from './nats.js'
async function main() {
const SERVICE_NAME = process.env.SERVICE_NAME;
if (!SERVICE_NAME) {
log('ERROR', "SERVICE_NAME env variable must be set");
process.exit(1);
}
const config = ConfigLoader();
try {
const packageInfo = await getLatestPackageName();
log('INFO', `Latest PIS Package: ${packageInfo.name}`);
if (!packageInfo.assets[0]?.browser_download_url) {
log('ERROR', `No attachments found for release ${packageInfo.name}`);
process.exit(9);
}
// Check if package already processed HERE...
log('WARN', 'Version check disabled, downloading data from source');
// exit(0) if done, else continue
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
const objectGenerator = processPisStream(config.General, inputStream);
const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`;
log('DEBUG', `Processing stream to: ${filename}`);
await processAndStore(config.S3, objectGenerator, filename);
log('DEBUG', 'Done');
log('DEBUG', "Sending message to NATS");
sendFileUpdateMessage(config.Mq, filename, packageInfo.name, SERVICE_NAME);
log('DEBUG', "Done, exiting")
return
} catch (err) {
log('ERROR', 'Fatal error in pipeline: ', err);
process.exit(7);
}
}
main();