Feature Complete:

- MongoDB Check before Initialising
- Output to file or S3
- Output to correct format for JSON Schema
This commit is contained in:
2026-01-07 19:51:07 +00:00
parent 03ac6431ff
commit 068cca32b4
7 changed files with 2128 additions and 35 deletions

1983
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -19,8 +19,10 @@
"author": "Frederick Boniface", "author": "Frederick Boniface",
"license": "SEE LICENSE IN LICENSE", "license": "SEE LICENSE IN LICENSE",
"dependencies": { "dependencies": {
"@bufbuild/protobuf": "^2.10.2", "@aws-sdk/client-s3": "^3.964.0",
"@owlboard/backend-data-contracts": "^0.0.1", "@aws-sdk/lib-storage": "^3.964.0",
"@owlboard/backend-data-contracts": "^0.1.0",
"mongodb": "^7.0.0",
"readline": "^1.3.0", "readline": "^1.3.0",
"xxhashjs": "^0.2.2" "xxhashjs": "^0.2.2"
}, },

62
src/database.ts Normal file
View File

@@ -0,0 +1,62 @@
// Functions to check the current applied version
// updating will be handled by the processing service
import { MongoClient } from "mongodb";
import { log } from "./logger";
const uri = process.env.MONGO_URI || "";
const db = process.env.MONGO_DB || "";
const collection = process.env.MONGO_COLLECTION || "";
const user = process.env.MONGO_USER || "";
const pass = process.env.MONGO_PASS || "";
if(!uri || !db || !collection || !user || !pass) {
log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35");
process.exit(35);
} else {
log("DEBUG", `MongoDB Connection`, {
uri: uri,
db: db,
collection: collection,
user: user,
pass: "****",
});
};
const CONNECTION_URI = `mongodb://${encodeURIComponent(user)}:${encodeURIComponent(pass)}@${uri}`;
let mongoClient: MongoClient | null = null;
async function getMongoClient() {
if (mongoClient) return mongoClient;
mongoClient = new MongoClient(CONNECTION_URI);
await mongoClient.connect();
return mongoClient;
}
export async function isPackageProcessed(serviceName: string, packageName: string): Promise<boolean> {
try {
const client = await getMongoClient();
const database = client.db(db);
const coll = database.collection(collection);
const result = await coll.findOne({ service_name: serviceName });
if (!result) {
log('INFO', `No metadata found for ${serviceName}. Fetching PIS Data`);
return false;
}
if (result.latest_package === packageName) {
log('INFO', 'No update needed');
return true;
}
log('INFO', `Version mismatch. DB: ${result.latest_package}, Current: ${packageName}. Update required`)
return false;
} catch (err) {
log('ERROR', 'Failed to check Mongo for version state:', err);
process.exit(1);
}
}

View File

@@ -1,12 +1,18 @@
import { log } from './logger.js' import { log } from './logger.js'
import { Readable } from 'node:stream' import { Readable } from 'node:stream'
import { createWriteStream } from 'node:fs' import { processAndStore } from './sss.js'
import { finished } from 'node:stream/promises'
import { getLatestPackageName, getRequestStream } from './sources/gitea.js' import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
import { processPisStream } from './process.js' import { processPisStream } from './process.js'
import { isPackageProcessed } from './database.js'
async function main() { async function main() {
const SERVICE_NAME = process.env.SERVICE_NAME;
if (!SERVICE_NAME) {
log('ERROR', "SERVICE_NAME env variable must be set");
process.exit(1);
}
try { try {
const packageInfo = await getLatestPackageName(); const packageInfo = await getLatestPackageName();
log('INFO', `Latest PIS Package: ${packageInfo.name}`); log('INFO', `Latest PIS Package: ${packageInfo.name}`);
@@ -16,12 +22,19 @@ async function main() {
process.exit(9); process.exit(9);
} }
if (await isPackageProcessed(SERVICE_NAME, packageInfo.name)) {
log('INFO', `Database matches latest release. Exiting`);
process.exit(0);
}
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url); const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
const objectGenerator = processPisStream(inputStream);
const outputPath = './review_output.ndjson'; const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`;
log('DEBUG', `Processing stream to: ${outputPath}`);
await writeToTemporaryFile(processPisStream(inputStream), outputPath); log('DEBUG', `Processing stream to: ${filename}`);
await processAndStore(objectGenerator, filename);
log('DEBUG', 'Done'); log('DEBUG', 'Done');
} catch (err) { } catch (err) {
log('ERROR', 'Fatal error in pipeline: ', err); log('ERROR', 'Fatal error in pipeline: ', err);
@@ -29,18 +42,4 @@ async function main() {
} }
} }
async function writeToTemporaryFile(generator: AsyncGenerator<any>, path: string) {
const writer = createWriteStream(path);
for await (const record of generator) {
const line = JSON.stringify(record) + '\n';
if (!writer.write(line)) {
await new Promise((resolve) => writer.once('drain', resolve));
}
}
writer.end();
return finished(writer);
}
main(); main();

View File

@@ -2,10 +2,17 @@ import { Readable } from 'node:stream';
import { createInterface } from 'node:readline'; import { createInterface } from 'node:readline';
import XXH from 'xxhashjs'; import XXH from 'xxhashjs';
import { log } from './logger.js'; import { log } from './logger.js';
import { DataIngressPisData } from '@owlboard/backend-data-contracts';
const BASE_URL = process.env.BASEURL || 'https://owlboard.info' const BASE_URL = process.env.BASEURL || 'https://owlboard.info'
// Local cache crs, tiploc mappings to reduce API hits to once per CRS
const tiplocCache = new Map<string, string>(); const tiplocCache = new Map<string, string>();
// To align with generating the hash in Go if needed for lookups
const SEED = 0; const SEED = 0;
// Type for the data input
interface InputRecord { interface InputRecord {
code: string; code: string;
stops: string[]; stops: string[];
@@ -33,14 +40,16 @@ export async function* processPisStream(inputStream: Readable) {
const crsHash = XXH.h64(record.stops.join('|'), SEED); const crsHash = XXH.h64(record.stops.join('|'), SEED);
const tiplocStops = await mapStopsToTiploc(record.stops); const tiplocStops = await mapStopsToTiploc(record.stops);
const tiplocHash = XXH.h64(tiplocStops.join('|'), SEED); const tiplocHash = XXH.h64(tiplocStops.join('|'), SEED);
yield {
const data: DataIngressPisData.PisObjects = {
code: record.code, code: record.code,
toc: TOC.toLowerCase(), toc: TOC.toLowerCase(),
crsStops: record.stops, crsStops: record.stops,
crsHash: crsHash.toString(10), crsHash: crsHash.toString(),
tiplocStops: tiplocStops, tiplocStops: tiplocStops,
tiplocHash: tiplocHash.toString(10), tiplocHash: tiplocHash.toString(),
} }
yield data;
} }
} }

View File

@@ -16,7 +16,7 @@ const REQUIRED_ENV_VARS = [
let missing: boolean = false; let missing: boolean = false;
for (const key of REQUIRED_ENV_VARS) { for (const key of REQUIRED_ENV_VARS) {
if (!process.env[key]) { if (!process.env[key]) {
log('ERROR', '`Missing required environment variable: ${key}'); log('ERROR', `Missing required environment variable: ${key}`);
missing = true; missing = true;
} else { } else {
log('DEBUG', `Environment Variable: ${key} = ${process.env[key]}`); log('DEBUG', `Environment Variable: ${key} = ${process.env[key]}`);

58
src/sss.ts Normal file
View File

@@ -0,0 +1,58 @@
import { S3Client } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { createWriteStream } from "node:fs";
import { Readable } from "node:stream";
import { DataIngressPisData } from "@owlboard/backend-data-contracts";
import { log } from "./logger.js";
export async function processAndStore(
generator: AsyncGenerator<DataIngressPisData.PisObjects>,
filename: string
) {
const ndjsonStream = Readable.from((async function* () {
for await (const record of generator) {
yield JSON.stringify(record) + "\n";
}
})());
const useS3 = process.env.S3_ENDPOINT && process.env.S3_BUCKET;
if (useS3) {
if (!process.env.S3_ENDPOINT || process.env.S3_BUCKET || !process.env.S3_REGION || !process.env.S3_ACCESS_KEY || !process.env.S3_SECRET_KEY) {
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
process.exit(24);
}
log('INFO', `Streaming to S3: ${process.env.S3_BUCKET}/${filename}`);
const client = new S3Client({
endpoint: process.env.S3_ENDPOINT,
region: process.env.S3_REGION,
credentials: {
accessKeyId: process.env.S3_ACCESS_KEY!,
secretAccessKey: process.env.S3_SECRET_KEY,
},
forcePathStyle: true,
});
const upload = new Upload({
client,
params: {
Bucket: process.env.S3_BUCKET,
Key: filename,
Body: ndjsonStream,
ContentType: "application/x-ndjson",
},
});
await upload.done();
} else {
log('INFO', `Streaming to local filesystem at: ${filename}`);
const fileWriter = createWriteStream(filename);
return new Promise((resolve, reject) => {
ndjsonStream.pipe(fileWriter)
.on('finish', resolve)
.on('error', reject);
})
}
}