9 Commits

9 changed files with 839 additions and 917 deletions

View File

@@ -20,7 +20,7 @@ WORKDIR /app
COPY package*.json ./ COPY package*.json ./
RUN npm ci --omit=dev RUN npm ci --omit=dev
COPY --from=builder /app/dist ./dist COPY --from=builder /app/dist /app/dist
USER node USER node

1430
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -25,14 +25,11 @@
"@aws-sdk/client-s3": "^3.964.0", "@aws-sdk/client-s3": "^3.964.0",
"@aws-sdk/lib-storage": "^3.964.0", "@aws-sdk/lib-storage": "^3.964.0",
"@owlboard/backend-data-contracts": "^0.1.9", "@owlboard/backend-data-contracts": "^0.1.9",
"mongodb": "^7.0.0",
"nats": "^2.29.3", "nats": "^2.29.3",
"readline": "^1.3.0", "readline": "^1.3.0"
"xxhashjs": "^0.2.2"
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^25.0.3", "@types/node": "^25.0.3",
"@types/xxhashjs": "^0.2.4",
"tsx": "^4.21.0", "tsx": "^4.21.0",
"typescript": "^5.9.3" "typescript": "^5.9.3"
} }

View File

@@ -1,6 +1,6 @@
import { readFileSync, existsSync } from "node:fs" import { readFileSync, existsSync } from "node:fs"
import { join } from "node:path" import { join } from "node:path"
import { log } from "./logger"; import { log } from "./logger.js";
interface Config { interface Config {
Mq: Mq, Mq: Mq,

View File

@@ -1,62 +0,0 @@
// Functions to check the current applied version
// updating will be handled by the processing service
import { MongoClient } from "mongodb";
import { log } from "./logger";
import type { Mongo } from "./config";
const collection = "data_ingress_meta";
function uriBuild(cfg: Mongo): string {
if(!cfg.MONGO_URI || !cfg.MONGO_DB || !cfg.MONGO_USER || !cfg.MONGO_PASS) {
log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35");
process.exit(35);
} else {
log("DEBUG", `MongoDB Connection`, {
uri: cfg.MONGO_URI,
db: cfg.MONGO_DB,
collection: collection,
user: cfg.MONGO_USER,
pass: "****",
});
};
return `mongodb://${encodeURIComponent(cfg.MONGO_USER)}:${encodeURIComponent(cfg.MONGO_PASS)}@${cfg.MONGO_URI}`
}
let mongoClient: MongoClient | null = null;
async function getMongoClient(cfg: Mongo) {
if (mongoClient) return mongoClient;
mongoClient = new MongoClient(uriBuild(cfg));
await mongoClient.connect();
return mongoClient;
}
export async function isPackageProcessed(cfg: Mongo, serviceName: string, packageName: string): Promise<boolean> {
try {
const client = await getMongoClient(cfg);
const database = client.db(cfg.MONGO_DB);
const coll = database.collection(collection);
const result = await coll.findOne({ service_name: serviceName });
if (!result) {
log('INFO', `No metadata found for ${serviceName}. Fetching PIS Data`);
return false;
}
if (result.latest_package === packageName) {
log('INFO', 'No update needed');
return true;
}
log('INFO', `Version mismatch. DB: ${result.latest_package}, Current: ${packageName}. Update required`)
return false;
} catch (err) {
log('ERROR', 'Failed to check Mongo for version state:', err);
process.exit(1);
}
}

View File

@@ -4,31 +4,55 @@ import { processAndStore } from './sss.js'
import { getLatestPackageName, getRequestStream } from './sources/gitea.js' import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
import { processPisStream } from './process.js' import { processPisStream } from './process.js'
import { isPackageProcessed } from './database.js'
import { ConfigLoader } from './config.js' import { ConfigLoader } from './config.js'
import { sendFileUpdateMessage } from './nats.js' import { natsManager } from './nats.js'
async function exit(exitCode: string | number=0): Promise<void> {
log("INFO", `Exiting with code: ${exitCode}`);
try {
await natsManager.close();
} catch (err) {
log("ERROR", `Error during cleanup: ${err}`);
process.exit(1)
}
process.exit(exitCode);
}
async function main() { async function main() {
const SERVICE_NAME = process.env.SERVICE_NAME; const SERVICE_NAME = process.env.SERVICE_NAME;
if (!SERVICE_NAME) { if (!SERVICE_NAME) {
log('ERROR', "SERVICE_NAME env variable must be set"); log('ERROR', "SERVICE_NAME env variable must be set");
process.exit(1); process.exitCode = 1;
return;
} }
const CURRENT_VERSION_KEY: string = `${SERVICE_NAME}-current-version`;
const config = ConfigLoader(); const config = ConfigLoader();
try {
log('INFO', `Initialising NATS`);
await natsManager.connect(config.Mq);
} catch (err) {
log('ERROR', `Unable to connect to NATS: ${err}`);
}
try { try {
const packageInfo = await getLatestPackageName(); const packageInfo = await getLatestPackageName();
log('INFO', `Latest PIS Package: ${packageInfo.name}`); log('INFO', `Latest PIS Package: ${packageInfo.name}`);
if (!packageInfo.assets[0]?.browser_download_url) { if (!packageInfo.assets[0]?.browser_download_url) {
log('ERROR', `No attachments found for release ${packageInfo.name}`); log('ERROR', `No attachments found for release ${packageInfo.name}`);
process.exit(9); process.exitCode = 1;
return;
} }
if (await isPackageProcessed(config.Mongo, SERVICE_NAME, packageInfo.name)) { const lastAppliedVersion: string | null = await natsManager.getState(CURRENT_VERSION_KEY);
log('INFO', `Database matches latest release. Exiting`); if (lastAppliedVersion === packageInfo.name) {
process.exit(0); log('INFO', "No new data, exiting");
return;
} }
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url); const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
@@ -42,12 +66,13 @@ async function main() {
log('DEBUG', 'Done'); log('DEBUG', 'Done');
log('DEBUG', "Sending message to NATS"); log('DEBUG', "Sending message to NATS");
sendFileUpdateMessage(config.Mq, filename, packageInfo.name, SERVICE_NAME); await natsManager.sendFileUpdateMessage(filename, packageInfo.name, SERVICE_NAME);
log('DEBUG', "Done, exiting") await natsManager.setState(CURRENT_VERSION_KEY, packageInfo.name);
return
} catch (err) { } catch (err) {
log('ERROR', 'Fatal error in pipeline: ', err); log('ERROR', 'Fatal error in pipeline: ', err);
process.exit(7); process.exitCode = 1;
} finally {
await exit(process.exitCode || 0);
} }
} }

View File

@@ -1,60 +1,136 @@
import { connect, JSONCodec } from "nats"; import { connect, JSONCodec, StringCodec } from "nats";
import type { ConnectionOptions, NatsConnection, Payload } from "nats"; import type { ConnectionOptions, NatsConnection, JetStreamClient, KV } from "nats";
import { log } from "./logger"; import { log } from "./logger.js";
import { hostname } from "node:os"; import { hostname } from "node:os";
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update"; import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
import type { Mq } from "./config"; import type { Mq } from "./config.js";
const jc = JSONCodec(); const jc = JSONCodec();
const sc = StringCodec();
async function getNatsConnection(cfg: Mq): Promise<NatsConnection> { class NatsManager {
const options: ConnectionOptions = { private nc: NatsConnection | null = null;
servers: cfg.MQ_URL, private js: JetStreamClient | null = null;
name: hostname(), private kv: KV | null = null;
reconnect: true, private bucketName = "INGRESS_STATES";
maxReconnectAttempts: -1,
};
if (cfg.MQ_USER && cfg.MQ_PASS) { /**
options.user = cfg.MQ_USER; * Opens connection to NATS
options.pass = cfg.MQ_PASS; */
log("INFO", "NATS: Using username/password authentication"); async connect(cfg: Mq): Promise<void> {
} else { if (this.nc) return;
log("INFO", "NATS: Connecting without authentication");
const options: ConnectionOptions = {
servers: cfg.MQ_URL,
name: hostname(),
reconnect: true,
maxReconnectAttempts: -1,
waitOnFirstConnect: true,
};
if (cfg.MQ_USER && cfg.MQ_PASS) {
options.user = cfg.MQ_USER;
options.pass = cfg.MQ_PASS;
log("INFO", "NATS: Using auth credentials");
}
try {
this.nc = await connect(options);
this.js = this.nc.jetstream();
log("INFO", `NATS: Connected to ${cfg.MQ_URL}`);
// Handle connection close events
this.nc.closed().then((err) => {
if (err) {
log("ERROR", `NATS: Connection closed: ${err}`);
} else {
log("INFO", "NATS: Connection ended")
}
this.nc = null;
this.js = null;
this.kv = null;
});
} catch (err) {
log("ERROR", `NATS: Initial connection failed: ${err}`);
throw err;
}
} }
return await connect(options) /**
} * Accessor for the KV store
*/
// Send Message Function here to send the message to NATS private async getKV(): Promise<KV> {
export async function sendFileUpdateMessage(cfg: Mq, path: string, version: string, serviceName: string): Promise<boolean> { if (!this.js) throw new Error("NATS: JetStream not initialized. Call connect() first.");
const serviceId: string = hostname(); if (!this.kv) {
const message: MQFileUpdate = { this.kv = await this.js.views.kv(this.bucketName);
service_name: serviceName,
service_id: serviceId,
sent_timestamp: Math.floor(Date.now() / 1000),
data_type: "file",
data_kind: "pis",
payload: {
version: version,
filepath: path,
} }
}; return this.kv;
}
let nats: NatsConnection | undefined;
try { /**
const nats: NatsConnection = await getNatsConnection(cfg); * Get the last recorded state/hash for a service
*/
async getState(key: string): Promise<string | null> {
try {
const store = await this.getKV();
const entry = await store.get(key);
return entry ? sc.decode(entry.value) : null;
} catch (err) {
log("ERROR", `NATS: KV Get failed for ${key}: ${err}`);
return null;
}
}
/**
* Set the current state/hash for a service
*/
async setState(key: string, value: string): Promise<void> {
try {
const store = await this.getKV();
await store.put(key, sc.encode(value));
} catch (err) {
log("ERROR", `NATS: KV Set failed for ${key}: ${err}`);
throw err;
}
}
/**
* Publishes message to the JetStream
*/
async sendFileUpdateMessage(path: string, version: string, serviceName: string): Promise<boolean> {
if (!this.js) throw new Error("NATS: JetStream not initialized");
const message: MQFileUpdate = {
service_name: serviceName,
service_id: hostname(),
sent_timestamp: Math.floor(Date.now() / 1000),
data_type: "file",
data_kind: "pis",
payload: {
version: version,
filepath: path,
}
};
const subject = `ingress.file.${message.data_kind}`; const subject = `ingress.file.${message.data_kind}`;
nats.publish(subject, jc.encode(message)); try {
await this.js.publish(subject, jc.encode(message));
await nats.drain(); log("INFO", `NATS: Message published to ${subject}`);
return true return true;
} catch (err) { } catch (err) {
log("ERROR", `NATS: Failed to send message: ${err}`); log("ERROR", `NATS: Failed to publish to JetStream: ${err}`);
if (nats) {nats.close()} return false;
return false; }
} }
}
async close() {
if (this.nc) {
await this.nc.drain();
this.nc = null;
}
}
}
// Export instance
export const natsManager = new NatsManager();

View File

@@ -1,6 +1,5 @@
import { Readable } from 'node:stream'; import { Readable } from 'node:stream';
import { createInterface } from 'node:readline'; import { createInterface } from 'node:readline';
import XXH from 'xxhashjs';
import { log } from './logger.js'; import { log } from './logger.js';
import { DataIngressPisData } from '@owlboard/backend-data-contracts'; import { DataIngressPisData } from '@owlboard/backend-data-contracts';
import type { GeneralConfig } from './config.js'; import type { GeneralConfig } from './config.js';
@@ -36,19 +35,19 @@ export async function* processPisStream(cfg: GeneralConfig, inputStream: Readabl
if (!line.trim()) continue; if (!line.trim()) continue;
const record = JSON.parse(line) as InputRecord; const record = JSON.parse(line) as InputRecord;
log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`) // log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
const crsHash = XXH.h64(record.stops.join('|'), SEED); const normalisedStops = record.stops.map(stop => stop.toUpperCase());
const tiplocStops = await mapStopsToTiploc(record.stops);
const tiplocHash = XXH.h64(tiplocStops.join('|'), SEED); const tiplocStops = await mapStopsToTiploc(normalisedStops);
const data: DataIngressPisData.PisObjects = { const data: DataIngressPisData.PisObjects = {
code: record.code, code: record.code,
toc: TOC.toLowerCase(), toc: TOC.toLowerCase(),
crsStops: record.stops, crsStops: normalisedStops,
crsHash: crsHash.toString(), crsHash: "",
tiplocStops: tiplocStops, tiplocStops: tiplocStops,
tiplocHash: tiplocHash.toString(), tiplocHash: "",
} }
yield data; yield data;
@@ -64,8 +63,9 @@ async function mapStopsToTiploc(crsStops: string[]): Promise<string[]> {
// Cache Miss // Cache Miss
try { try {
const tiploc = await fetchTiplocFromApi(crs); const tiploc = await fetchTiplocFromApi(crs);
tiplocCache.set(crs, tiploc); const normalisedTiploc = tiploc.toUpperCase();
return tiploc; tiplocCache.set(crs, normalisedTiploc);
return normalisedTiploc;
} catch (err) { } catch (err) {
log('ERROR', `Failed lookup for: ${crs}`, err); log('ERROR', `Failed lookup for: ${crs}`, err);
process.exit(99); process.exit(99);

View File

@@ -1,4 +1,4 @@
import { S3Client } from "@aws-sdk/client-s3"; import { S3Client, CreateBucketCommand, HeadBucketCommand, S3ServiceException } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage"; import { Upload } from "@aws-sdk/lib-storage";
import { createWriteStream } from "node:fs"; import { createWriteStream } from "node:fs";
import { Readable } from "node:stream"; import { Readable } from "node:stream";
@@ -24,7 +24,8 @@ export async function processAndStore(
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24"); log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
process.exit(24); process.exit(24);
} }
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
log("DEBUG", "Opening connection to S3 Server");
const client = new S3Client({ const client = new S3Client({
endpoint: cfg.S3_ENDPOINT, endpoint: cfg.S3_ENDPOINT,
@@ -36,6 +37,21 @@ export async function processAndStore(
forcePathStyle: true, forcePathStyle: true,
}); });
// Check bucket exists, and create if needed
try {
await client.send(new HeadBucketCommand({ Bucket: cfg.S3_BUCKET }));
} catch (err) {
if (err instanceof S3ServiceException && err.$metadata.httpStatusCode === 404) {
log('INFO', `Bucket ${cfg.S3_BUCKET} does not exist, creating...`);
await client.send(new CreateBucketCommand({ Bucket: cfg.S3_BUCKET }));
} else {
log(`ERROR`, `Failed to verify bucket: ${cfg.S3_BUCKET}`);
process.exit(21);
}
}
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
const upload = new Upload({ const upload = new Upload({
client, client,
params: { params: {