3 Commits
0.0.7 ... 0.0.9

4 changed files with 665 additions and 794 deletions

1430
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -25,14 +25,11 @@
"@aws-sdk/client-s3": "^3.964.0",
"@aws-sdk/lib-storage": "^3.964.0",
"@owlboard/backend-data-contracts": "^0.1.9",
"mongodb": "^7.0.0",
"nats": "^2.29.3",
"readline": "^1.3.0",
"xxhashjs": "^0.2.2"
"readline": "^1.3.0"
},
"devDependencies": {
"@types/node": "^25.0.3",
"@types/xxhashjs": "^0.2.4",
"tsx": "^4.21.0",
"typescript": "^5.9.3"
}

View File

@@ -41,7 +41,11 @@ class NatsManager {
// Handle connection close events
this.nc.closed().then((err) => {
log("ERROR", `NATS: Connection closed: ${err}`);
if (err) {
log("ERROR", `NATS: Connection closed: ${err}`);
} else {
log("INFO", "NATS: Connection ended")
}
this.nc = null;
this.js = null;
this.kv = null;

View File

@@ -1,6 +1,5 @@
import { Readable } from 'node:stream';
import { createInterface } from 'node:readline';
import XXH from 'xxhashjs';
import { log } from './logger.js';
import { DataIngressPisData } from '@owlboard/backend-data-contracts';
import type { GeneralConfig } from './config.js';
@@ -38,17 +37,17 @@ export async function* processPisStream(cfg: GeneralConfig, inputStream: Readabl
const record = JSON.parse(line) as InputRecord;
// log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
const crsHash = XXH.h64(record.stops.join('|'), SEED);
const tiplocStops = await mapStopsToTiploc(record.stops);
const tiplocHash = XXH.h64(tiplocStops.join('|'), SEED);
const normalisedStops = record.stops.map(stop => stop.toUpperCase());
const tiplocStops = await mapStopsToTiploc(normalisedStops);
const data: DataIngressPisData.PisObjects = {
code: record.code,
toc: TOC.toLowerCase(),
crsStops: record.stops,
crsHash: crsHash.toString(),
crsStops: normalisedStops,
crsHash: "",
tiplocStops: tiplocStops,
tiplocHash: tiplocHash.toString(),
tiplocHash: "",
}
yield data;
@@ -64,8 +63,9 @@ async function mapStopsToTiploc(crsStops: string[]): Promise<string[]> {
// Cache Miss
try {
const tiploc = await fetchTiplocFromApi(crs);
tiplocCache.set(crs, tiploc);
return tiploc;
const normalisedTiploc = tiploc.toUpperCase();
tiplocCache.set(crs, normalisedTiploc);
return normalisedTiploc;
} catch (err) {
log('ERROR', `Failed lookup for: ${crs}`, err);
process.exit(99);