1 Commits

Author SHA1 Message Date
c90163cdce Switch to central config loader using mounted secret files. 2026-01-13 19:53:08 +00:00
6 changed files with 127 additions and 38 deletions

79
src/config.ts Normal file
View File

@@ -0,0 +1,79 @@
import { readFileSync, existsSync } from "node:fs"
import { join } from "node:path"
import { log } from "./logger";
interface Config {
Mq: Mq,
S3: S3,
Mongo: Mongo,
General: GeneralConfig,
}
export interface Mq {
MQ_USER: string;
MQ_PASS: string;
MQ_URL: string;
MQ_TOPIC: string;
}
export interface S3 {
S3_ENDPOINT: string;
S3_BUCKET: string;
S3_ACCESS_KEY: string;
S3_SECRET_KEY: string;
S3_REGION: string;
}
export interface Mongo {
MONGO_URI: string;
MONGO_DB: string;
MONGO_USER: string;
MONGO_PASS: string;
}
export interface GeneralConfig {
TOC: string;
}
export function ConfigLoader(): Config {
const cfg: Config = {
Mq: {
MQ_URL: getConfig("MQ_URL"),
MQ_TOPIC: getConfig("MQ_TOPIC"),
MQ_USER: getConfig("MQ_USER"),
MQ_PASS: getConfig("MQ_PASS"),
},
S3: {
S3_ENDPOINT: getConfig("S3_ENDPOINT"),
S3_BUCKET: getConfig("S3_BUCKET"),
S3_ACCESS_KEY: getConfig("S3_ACCESS_KEY"),
S3_SECRET_KEY: getConfig("S3_SECRET_KEY"),
S3_REGION: getConfig("S3_REGION"),
},
Mongo: {
MONGO_URI: getConfig("MONGO_URI"),
MONGO_DB: getConfig("MONGO_DB"),
MONGO_USER: getConfig("MONGO_USER"),
MONGO_PASS: getConfig("MONGO_PASS"),
},
General: {
TOC: getConfig("TOC"),
}
};
return cfg
}
function getConfig(key: string): string {
const filePath = join("/etc/secrets", key);
if (existsSync(filePath)) {
try {
return readFileSync(filePath, "utf8").trim();
} catch (err) {
log("ERROR", `Error reading secret file at ${filePath}: ${err}`);
}
}
return process.env[key] || ""
}

View File

@@ -3,42 +3,42 @@
import { MongoClient } from "mongodb"; import { MongoClient } from "mongodb";
import { log } from "./logger"; import { log } from "./logger";
import type { Mongo } from "./config";
const uri = process.env.MONGO_URI || "";
const db = process.env.MONGO_DB || "";
const user = process.env.MONGO_USER || "";
const pass = process.env.MONGO_PASS || "";
const collection = "data_ingress_meta"; const collection = "data_ingress_meta";
if(!uri || !db || !user || !pass) {
function uriBuild(cfg: Mongo): string {
if(!cfg.MONGO_URI || !cfg.MONGO_DB || !cfg.MONGO_USER || !cfg.MONGO_PASS) {
log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35"); log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35");
process.exit(35); process.exit(35);
} else { } else {
log("DEBUG", `MongoDB Connection`, { log("DEBUG", `MongoDB Connection`, {
uri: uri, uri: cfg.MONGO_URI,
db: db, db: cfg.MONGO_DB,
collection: collection, collection: collection,
user: user, user: cfg.MONGO_USER,
pass: "****", pass: "****",
}); });
}; };
const CONNECTION_URI = `mongodb://${encodeURIComponent(user)}:${encodeURIComponent(pass)}@${uri}`; return `mongodb://${encodeURIComponent(cfg.MONGO_USER)}:${encodeURIComponent(cfg.MONGO_PASS)}@${cfg.MONGO_URI}`
}
let mongoClient: MongoClient | null = null; let mongoClient: MongoClient | null = null;
async function getMongoClient() { async function getMongoClient(cfg: Mongo) {
if (mongoClient) return mongoClient; if (mongoClient) return mongoClient;
mongoClient = new MongoClient(CONNECTION_URI); mongoClient = new MongoClient(uriBuild(cfg));
await mongoClient.connect(); await mongoClient.connect();
return mongoClient; return mongoClient;
} }
export async function isPackageProcessed(serviceName: string, packageName: string): Promise<boolean> { export async function isPackageProcessed(cfg: Mongo, serviceName: string, packageName: string): Promise<boolean> {
try { try {
const client = await getMongoClient(); const client = await getMongoClient(cfg);
const database = client.db(db); const database = client.db(cfg.MONGO_DB);
const coll = database.collection(collection); const coll = database.collection(collection);
const result = await coll.findOne({ service_name: serviceName }); const result = await coll.findOne({ service_name: serviceName });

View File

@@ -5,6 +5,8 @@ import { processAndStore } from './sss.js'
import { getLatestPackageName, getRequestStream } from './sources/gitea.js' import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
import { processPisStream } from './process.js' import { processPisStream } from './process.js'
import { isPackageProcessed } from './database.js' import { isPackageProcessed } from './database.js'
import { ConfigLoader } from './config.js'
import { sendFileUpdateMessage } from './nats.js'
async function main() { async function main() {
const SERVICE_NAME = process.env.SERVICE_NAME; const SERVICE_NAME = process.env.SERVICE_NAME;
@@ -13,6 +15,8 @@ async function main() {
process.exit(1); process.exit(1);
} }
const config = ConfigLoader();
try { try {
const packageInfo = await getLatestPackageName(); const packageInfo = await getLatestPackageName();
log('INFO', `Latest PIS Package: ${packageInfo.name}`); log('INFO', `Latest PIS Package: ${packageInfo.name}`);
@@ -22,20 +26,25 @@ async function main() {
process.exit(9); process.exit(9);
} }
if (await isPackageProcessed(SERVICE_NAME, packageInfo.name)) { if (await isPackageProcessed(config.Mongo, SERVICE_NAME, packageInfo.name)) {
log('INFO', `Database matches latest release. Exiting`); log('INFO', `Database matches latest release. Exiting`);
process.exit(0); process.exit(0);
} }
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url); const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
const objectGenerator = processPisStream(inputStream); const objectGenerator = processPisStream(config.General, inputStream);
const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`; const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`;
log('DEBUG', `Processing stream to: ${filename}`); log('DEBUG', `Processing stream to: ${filename}`);
await processAndStore(objectGenerator, filename); await processAndStore(config.S3, objectGenerator, filename);
log('DEBUG', 'Done'); log('DEBUG', 'Done');
log('DEBUG', "Sending message to NATS");
sendFileUpdateMessage(config.Mq, filename, packageInfo.name, SERVICE_NAME);
log('DEBUG', "Done, exiting")
return
} catch (err) { } catch (err) {
log('ERROR', 'Fatal error in pipeline: ', err); log('ERROR', 'Fatal error in pipeline: ', err);
process.exit(7); process.exit(7);

View File

@@ -3,22 +3,21 @@ import type { ConnectionOptions, NatsConnection, Payload } from "nats";
import { log } from "./logger"; import { log } from "./logger";
import { hostname } from "node:os"; import { hostname } from "node:os";
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update"; import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
import type { Mq } from "./config";
const jc = JSONCodec(); const jc = JSONCodec();
async function getNatsConnection(): Promise<NatsConnection> { async function getNatsConnection(cfg: Mq): Promise<NatsConnection> {
const serverUrl = process.env.MQ_URL || "nats://localhost:4222";
const options: ConnectionOptions = { const options: ConnectionOptions = {
servers: serverUrl, servers: cfg.MQ_URL,
name: hostname(), name: hostname(),
reconnect: true, reconnect: true,
maxReconnectAttempts: -1, maxReconnectAttempts: -1,
}; };
if (process.env.MQ_USER && process.env.MQ_PASS) { if (cfg.MQ_USER && cfg.MQ_PASS) {
options.user = process.env.MQ_USER; options.user = cfg.MQ_USER;
options.pass = process.env.MQ_PASS; options.pass = cfg.MQ_PASS;
log("INFO", "NATS: Using username/password authentication"); log("INFO", "NATS: Using username/password authentication");
} else { } else {
log("INFO", "NATS: Connecting without authentication"); log("INFO", "NATS: Connecting without authentication");
@@ -28,11 +27,10 @@ async function getNatsConnection(): Promise<NatsConnection> {
} }
// Send Message Function here to send the message to NATS // Send Message Function here to send the message to NATS
export async function sendFileUpdateMessage(path: string, version: string): Promise<boolean> { export async function sendFileUpdateMessage(cfg: Mq, path: string, version: string, serviceName: string): Promise<boolean> {
const serviceName: string = "pis-data-ingress";
const serviceId: string = hostname(); const serviceId: string = hostname();
const message: MQFileUpdate = { const message: MQFileUpdate = {
service_name: "pis-data-ingress", service_name: serviceName,
service_id: serviceId, service_id: serviceId,
sent_timestamp: Math.floor(Date.now() / 1000), sent_timestamp: Math.floor(Date.now() / 1000),
data_type: "file", data_type: "file",
@@ -46,7 +44,7 @@ export async function sendFileUpdateMessage(path: string, version: string): Prom
let nats: NatsConnection | undefined; let nats: NatsConnection | undefined;
try { try {
const nats: NatsConnection = await getNatsConnection(); const nats: NatsConnection = await getNatsConnection(cfg);
const subject = `ingress.file.${message.data_kind}`; const subject = `ingress.file.${message.data_kind}`;

View File

@@ -3,6 +3,7 @@ import { createInterface } from 'node:readline';
import XXH from 'xxhashjs'; import XXH from 'xxhashjs';
import { log } from './logger.js'; import { log } from './logger.js';
import { DataIngressPisData } from '@owlboard/backend-data-contracts'; import { DataIngressPisData } from '@owlboard/backend-data-contracts';
import type { GeneralConfig } from './config.js';
const BASE_URL = process.env.BASEURL || 'https://owlboard.info' const BASE_URL = process.env.BASEURL || 'https://owlboard.info'
@@ -18,8 +19,8 @@ interface InputRecord {
stops: string[]; stops: string[];
} }
export async function* processPisStream(inputStream: Readable) { export async function* processPisStream(cfg: GeneralConfig, inputStream: Readable) {
const TOC = process.env.TOC; const TOC = cfg.TOC;
if (!TOC) { if (!TOC) {
log('ERROR', "TOC not set: Exit code 19"); log('ERROR', "TOC not set: Exit code 19");
process.exit(19); process.exit(19);

View File

@@ -4,8 +4,10 @@ import { createWriteStream } from "node:fs";
import { Readable } from "node:stream"; import { Readable } from "node:stream";
import { DataIngressPisData } from "@owlboard/backend-data-contracts"; import { DataIngressPisData } from "@owlboard/backend-data-contracts";
import { log } from "./logger.js"; import { log } from "./logger.js";
import type { S3 } from "./config.js";
export async function processAndStore( export async function processAndStore(
cfg: S3,
generator: AsyncGenerator<DataIngressPisData.PisObjects>, generator: AsyncGenerator<DataIngressPisData.PisObjects>,
filename: string filename: string
) { ) {
@@ -15,21 +17,21 @@ export async function processAndStore(
} }
})()); })());
const useS3 = process.env.S3_ENDPOINT && process.env.S3_BUCKET; const useS3 = cfg.S3_ENDPOINT && cfg.S3_BUCKET;
if (useS3) { if (useS3) {
if (!process.env.S3_ENDPOINT || process.env.S3_BUCKET || !process.env.S3_REGION || !process.env.S3_ACCESS_KEY || !process.env.S3_SECRET_KEY) { if (!cfg.S3_ENDPOINT || !cfg.S3_BUCKET || !cfg.S3_REGION || !cfg.S3_ACCESS_KEY || !cfg.S3_SECRET_KEY) {
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24"); log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
process.exit(24); process.exit(24);
} }
log('INFO', `Streaming to S3: ${process.env.S3_BUCKET}/${filename}`); log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
const client = new S3Client({ const client = new S3Client({
endpoint: process.env.S3_ENDPOINT, endpoint: cfg.S3_ENDPOINT,
region: process.env.S3_REGION, region: cfg.S3_REGION,
credentials: { credentials: {
accessKeyId: process.env.S3_ACCESS_KEY!, accessKeyId: cfg.S3_ACCESS_KEY!,
secretAccessKey: process.env.S3_SECRET_KEY, secretAccessKey: cfg.S3_SECRET_KEY,
}, },
forcePathStyle: true, forcePathStyle: true,
}); });
@@ -37,7 +39,7 @@ export async function processAndStore(
const upload = new Upload({ const upload = new Upload({
client, client,
params: { params: {
Bucket: process.env.S3_BUCKET, Bucket: cfg.S3_BUCKET,
Key: filename, Key: filename,
Body: ndjsonStream, Body: ndjsonStream,
ContentType: "application/x-ndjson", ContentType: "application/x-ndjson",