diff --git a/src/config.ts b/src/config.ts new file mode 100644 index 0000000..4c14c8d --- /dev/null +++ b/src/config.ts @@ -0,0 +1,79 @@ +import { readFileSync, existsSync } from "node:fs" +import { join } from "node:path" +import { log } from "./logger"; + +interface Config { + Mq: Mq, + S3: S3, + Mongo: Mongo, + General: GeneralConfig, +} + +export interface Mq { + MQ_USER: string; + MQ_PASS: string; + MQ_URL: string; + MQ_TOPIC: string; +} + +export interface S3 { + S3_ENDPOINT: string; + S3_BUCKET: string; + S3_ACCESS_KEY: string; + S3_SECRET_KEY: string; + S3_REGION: string; +} + +export interface Mongo { + MONGO_URI: string; + MONGO_DB: string; + MONGO_USER: string; + MONGO_PASS: string; +} + +export interface GeneralConfig { + TOC: string; +} + +export function ConfigLoader(): Config { + const cfg: Config = { + Mq: { + MQ_URL: getConfig("MQ_URL"), + MQ_TOPIC: getConfig("MQ_TOPIC"), + MQ_USER: getConfig("MQ_USER"), + MQ_PASS: getConfig("MQ_PASS"), + }, + S3: { + S3_ENDPOINT: getConfig("S3_ENDPOINT"), + S3_BUCKET: getConfig("S3_BUCKET"), + S3_ACCESS_KEY: getConfig("S3_ACCESS_KEY"), + S3_SECRET_KEY: getConfig("S3_SECRET_KEY"), + S3_REGION: getConfig("S3_REGION"), + }, + Mongo: { + MONGO_URI: getConfig("MONGO_URI"), + MONGO_DB: getConfig("MONGO_DB"), + MONGO_USER: getConfig("MONGO_USER"), + MONGO_PASS: getConfig("MONGO_PASS"), + }, + General: { + TOC: getConfig("TOC"), + } + }; + + return cfg +} + +function getConfig(key: string): string { + const filePath = join("/etc/secrets", key); + if (existsSync(filePath)) { + try { + return readFileSync(filePath, "utf8").trim(); + } catch (err) { + log("ERROR", `Error reading secret file at ${filePath}: ${err}`); + } + } + + return process.env[key] || "" +} + diff --git a/src/database.ts b/src/database.ts index c4b4d46..bf99621 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3,42 +3,42 @@ import { MongoClient } from "mongodb"; import { log } from "./logger"; +import type { Mongo } from "./config"; -const uri = process.env.MONGO_URI || ""; -const db = process.env.MONGO_DB || ""; -const user = process.env.MONGO_USER || ""; -const pass = process.env.MONGO_PASS || ""; const collection = "data_ingress_meta"; -if(!uri || !db || !user || !pass) { + +function uriBuild(cfg: Mongo): string { +if(!cfg.MONGO_URI || !cfg.MONGO_DB || !cfg.MONGO_USER || !cfg.MONGO_PASS) { log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35"); process.exit(35); } else { log("DEBUG", `MongoDB Connection`, { - uri: uri, - db: db, + uri: cfg.MONGO_URI, + db: cfg.MONGO_DB, collection: collection, - user: user, + user: cfg.MONGO_USER, pass: "****", }); }; -const CONNECTION_URI = `mongodb://${encodeURIComponent(user)}:${encodeURIComponent(pass)}@${uri}`; +return `mongodb://${encodeURIComponent(cfg.MONGO_USER)}:${encodeURIComponent(cfg.MONGO_PASS)}@${cfg.MONGO_URI}` +} let mongoClient: MongoClient | null = null; -async function getMongoClient() { +async function getMongoClient(cfg: Mongo) { if (mongoClient) return mongoClient; - mongoClient = new MongoClient(CONNECTION_URI); + mongoClient = new MongoClient(uriBuild(cfg)); await mongoClient.connect(); return mongoClient; } -export async function isPackageProcessed(serviceName: string, packageName: string): Promise { +export async function isPackageProcessed(cfg: Mongo, serviceName: string, packageName: string): Promise { try { - const client = await getMongoClient(); - const database = client.db(db); + const client = await getMongoClient(cfg); + const database = client.db(cfg.MONGO_DB); const coll = database.collection(collection); const result = await coll.findOne({ service_name: serviceName }); diff --git a/src/index.ts b/src/index.ts index 30cf114..e42ae4d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,8 @@ import { processAndStore } from './sss.js' import { getLatestPackageName, getRequestStream } from './sources/gitea.js' import { processPisStream } from './process.js' import { isPackageProcessed } from './database.js' +import { ConfigLoader } from './config.js' +import { sendFileUpdateMessage } from './nats.js' async function main() { const SERVICE_NAME = process.env.SERVICE_NAME; @@ -13,6 +15,8 @@ async function main() { process.exit(1); } + const config = ConfigLoader(); + try { const packageInfo = await getLatestPackageName(); log('INFO', `Latest PIS Package: ${packageInfo.name}`); @@ -22,20 +26,25 @@ async function main() { process.exit(9); } - if (await isPackageProcessed(SERVICE_NAME, packageInfo.name)) { + if (await isPackageProcessed(config.Mongo, SERVICE_NAME, packageInfo.name)) { log('INFO', `Database matches latest release. Exiting`); process.exit(0); } const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url); - const objectGenerator = processPisStream(inputStream); + const objectGenerator = processPisStream(config.General, inputStream); const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`; log('DEBUG', `Processing stream to: ${filename}`); - await processAndStore(objectGenerator, filename); + await processAndStore(config.S3, objectGenerator, filename); log('DEBUG', 'Done'); + + log('DEBUG', "Sending message to NATS"); + sendFileUpdateMessage(config.Mq, filename, packageInfo.name, SERVICE_NAME); + log('DEBUG', "Done, exiting") + return } catch (err) { log('ERROR', 'Fatal error in pipeline: ', err); process.exit(7); diff --git a/src/nats.ts b/src/nats.ts index 08bb72e..e05aa0a 100644 --- a/src/nats.ts +++ b/src/nats.ts @@ -3,22 +3,21 @@ import type { ConnectionOptions, NatsConnection, Payload } from "nats"; import { log } from "./logger"; import { hostname } from "node:os"; import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update"; +import type { Mq } from "./config"; const jc = JSONCodec(); -async function getNatsConnection(): Promise { - const serverUrl = process.env.MQ_URL || "nats://localhost:4222"; - +async function getNatsConnection(cfg: Mq): Promise { const options: ConnectionOptions = { - servers: serverUrl, + servers: cfg.MQ_URL, name: hostname(), reconnect: true, maxReconnectAttempts: -1, }; - if (process.env.MQ_USER && process.env.MQ_PASS) { - options.user = process.env.MQ_USER; - options.pass = process.env.MQ_PASS; + if (cfg.MQ_USER && cfg.MQ_PASS) { + options.user = cfg.MQ_USER; + options.pass = cfg.MQ_PASS; log("INFO", "NATS: Using username/password authentication"); } else { log("INFO", "NATS: Connecting without authentication"); @@ -28,11 +27,10 @@ async function getNatsConnection(): Promise { } // Send Message Function here to send the message to NATS -export async function sendFileUpdateMessage(path: string, version: string): Promise { - const serviceName: string = "pis-data-ingress"; +export async function sendFileUpdateMessage(cfg: Mq, path: string, version: string, serviceName: string): Promise { const serviceId: string = hostname(); const message: MQFileUpdate = { - service_name: "pis-data-ingress", + service_name: serviceName, service_id: serviceId, sent_timestamp: Math.floor(Date.now() / 1000), data_type: "file", @@ -46,7 +44,7 @@ export async function sendFileUpdateMessage(path: string, version: string): Prom let nats: NatsConnection | undefined; try { - const nats: NatsConnection = await getNatsConnection(); + const nats: NatsConnection = await getNatsConnection(cfg); const subject = `ingress.file.${message.data_kind}`; diff --git a/src/process.ts b/src/process.ts index e0671f5..c1023f5 100644 --- a/src/process.ts +++ b/src/process.ts @@ -3,6 +3,7 @@ import { createInterface } from 'node:readline'; import XXH from 'xxhashjs'; import { log } from './logger.js'; import { DataIngressPisData } from '@owlboard/backend-data-contracts'; +import type { GeneralConfig } from './config.js'; const BASE_URL = process.env.BASEURL || 'https://owlboard.info' @@ -18,8 +19,8 @@ interface InputRecord { stops: string[]; } -export async function* processPisStream(inputStream: Readable) { - const TOC = process.env.TOC; +export async function* processPisStream(cfg: GeneralConfig, inputStream: Readable) { + const TOC = cfg.TOC; if (!TOC) { log('ERROR', "TOC not set: Exit code 19"); process.exit(19); diff --git a/src/sss.ts b/src/sss.ts index 8de37af..ac76628 100644 --- a/src/sss.ts +++ b/src/sss.ts @@ -4,8 +4,10 @@ import { createWriteStream } from "node:fs"; import { Readable } from "node:stream"; import { DataIngressPisData } from "@owlboard/backend-data-contracts"; import { log } from "./logger.js"; +import type { S3 } from "./config.js"; export async function processAndStore( + cfg: S3, generator: AsyncGenerator, filename: string ) { @@ -15,21 +17,21 @@ export async function processAndStore( } })()); - const useS3 = process.env.S3_ENDPOINT && process.env.S3_BUCKET; + const useS3 = cfg.S3_ENDPOINT && cfg.S3_BUCKET; if (useS3) { - if (!process.env.S3_ENDPOINT || process.env.S3_BUCKET || !process.env.S3_REGION || !process.env.S3_ACCESS_KEY || !process.env.S3_SECRET_KEY) { + if (!cfg.S3_ENDPOINT || !cfg.S3_BUCKET || !cfg.S3_REGION || !cfg.S3_ACCESS_KEY || !cfg.S3_SECRET_KEY) { log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24"); process.exit(24); } - log('INFO', `Streaming to S3: ${process.env.S3_BUCKET}/${filename}`); + log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`); const client = new S3Client({ - endpoint: process.env.S3_ENDPOINT, - region: process.env.S3_REGION, + endpoint: cfg.S3_ENDPOINT, + region: cfg.S3_REGION, credentials: { - accessKeyId: process.env.S3_ACCESS_KEY!, - secretAccessKey: process.env.S3_SECRET_KEY, + accessKeyId: cfg.S3_ACCESS_KEY!, + secretAccessKey: cfg.S3_SECRET_KEY, }, forcePathStyle: true, }); @@ -37,7 +39,7 @@ export async function processAndStore( const upload = new Upload({ client, params: { - Bucket: process.env.S3_BUCKET, + Bucket: cfg.S3_BUCKET, Key: filename, Body: ndjsonStream, ContentType: "application/x-ndjson",