Files
pis-data-ingress/src/process.ts

88 lines
2.5 KiB
TypeScript

import { Readable } from 'node:stream';
import { createInterface } from 'node:readline';
import XXH from 'xxhashjs';
import { log } from './logger.js';
import { DataIngressPisData } from '@owlboard/backend-data-contracts';
import type { GeneralConfig } from './config.js';
const BASE_URL = process.env.BASEURL || 'https://owlboard.info'
// Local cache crs, tiploc mappings to reduce API hits to once per CRS
const tiplocCache = new Map<string, string>();
// To align with generating the hash in Go if needed for lookups
const SEED = 0;
// Type for the data input
interface InputRecord {
code: string;
stops: string[];
}
export async function* processPisStream(cfg: GeneralConfig, inputStream: Readable) {
const TOC = cfg.TOC;
if (!TOC) {
log('ERROR', "TOC not set: Exit code 19");
process.exit(19);
}
log('DEBUG', `TOC set to: ${TOC}`)
const rl = createInterface({
input: inputStream,
terminal: false,
});
for await (const line of rl) {
if (!line.trim()) continue;
const record = JSON.parse(line) as InputRecord;
log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
const crsHash = XXH.h64(record.stops.join('|'), SEED);
const tiplocStops = await mapStopsToTiploc(record.stops);
const tiplocHash = XXH.h64(tiplocStops.join('|'), SEED);
const data: DataIngressPisData.PisObjects = {
code: record.code,
toc: TOC.toLowerCase(),
crsStops: record.stops,
crsHash: crsHash.toString(),
tiplocStops: tiplocStops,
tiplocHash: tiplocHash.toString(),
}
yield data;
}
}
async function mapStopsToTiploc(crsStops: string[]): Promise<string[]> {
return Promise.all(crsStops.map(async (crs) => {
if (tiplocCache.has(crs)) {
return tiplocCache.get(crs)!;
}
// Cache Miss
try {
const tiploc = await fetchTiplocFromApi(crs);
tiplocCache.set(crs, tiploc);
return tiploc;
} catch (err) {
log('ERROR', `Failed lookup for: ${crs}`, err);
process.exit(99);
}
}))
}
async function fetchTiplocFromApi(crs: string): Promise<string> {
const apiUrl = `${BASE_URL}/api/v2/ref/locationCode/crs/${crs}`;
const response = await fetch(apiUrl, {
headers: { 'Accept': 'application/json' },
});
if (!response.ok) {
throw new Error(`API Error: ${response.status}`);
}
const data = await response.json();
return data[0].TIPLOC;
}