Add fetch and stream to temporary file

This commit is contained in:
2026-01-06 20:58:12 +00:00
parent 49b46f5e9c
commit 03ac6431ff
7 changed files with 728 additions and 15 deletions

View File

@@ -1 +1,46 @@
import { log } from './logger'
import { log } from './logger.js'
import { Readable } from 'node:stream'
import { createWriteStream } from 'node:fs'
import { finished } from 'node:stream/promises'
import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
import { processPisStream } from './process.js'
async function main() {
try {
const packageInfo = await getLatestPackageName();
log('INFO', `Latest PIS Package: ${packageInfo.name}`);
if (!packageInfo.assets[0]?.browser_download_url) {
log('ERROR', `No attachments found for release ${packageInfo.name}`);
process.exit(9);
}
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
const outputPath = './review_output.ndjson';
log('DEBUG', `Processing stream to: ${outputPath}`);
await writeToTemporaryFile(processPisStream(inputStream), outputPath);
log('DEBUG', 'Done');
} catch (err) {
log('ERROR', 'Fatal error in pipeline: ', err);
process.exit(7);
}
}
async function writeToTemporaryFile(generator: AsyncGenerator<any>, path: string) {
const writer = createWriteStream(path);
for await (const record of generator) {
const line = JSON.stringify(record) + '\n';
if (!writer.write(line)) {
await new Promise((resolve) => writer.once('drain', resolve));
}
}
writer.end();
return finished(writer);
}
main();

View File

@@ -15,7 +15,7 @@ const levelWeight: Record<LogLevel, number> = {
const RESET = '\x1b[0m';
const CURRENT_LOG_LEVEL = (process.env.LOG_LEVEL as LogLevel) || 'INFO';
const CURRENT_LOG_LEVEL = (process.env.LOG_LEVEL as LogLevel) || 'DEBUG';
const MIN_WEIGHT = levelWeight[CURRENT_LOG_LEVEL] ?? 1;
export function log(level: LogLevel, msg: string, ...args: any[]): void {

78
src/process.ts Normal file
View File

@@ -0,0 +1,78 @@
import { Readable } from 'node:stream';
import { createInterface } from 'node:readline';
import XXH from 'xxhashjs';
import { log } from './logger.js';
const BASE_URL = process.env.BASEURL || 'https://owlboard.info'
const tiplocCache = new Map<string, string>();
const SEED = 0;
interface InputRecord {
code: string;
stops: string[];
}
export async function* processPisStream(inputStream: Readable) {
const TOC = process.env.TOC;
if (!TOC) {
log('ERROR', "TOC not set: Exit code 19");
process.exit(19);
}
log('DEBUG', `TOC set to: ${TOC}`)
const rl = createInterface({
input: inputStream,
terminal: false,
});
for await (const line of rl) {
if (!line.trim()) continue;
const record = JSON.parse(line) as InputRecord;
log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
const crsHash = XXH.h64(record.stops.join('|'), SEED);
const tiplocStops = await mapStopsToTiploc(record.stops);
const tiplocHash = XXH.h64(tiplocStops.join('|'), SEED);
yield {
code: record.code,
toc: TOC.toLowerCase(),
crsStops: record.stops,
crsHash: crsHash.toString(10),
tiplocStops: tiplocStops,
tiplocHash: tiplocHash.toString(10),
}
}
}
async function mapStopsToTiploc(crsStops: string[]): Promise<string[]> {
return Promise.all(crsStops.map(async (crs) => {
if (tiplocCache.has(crs)) {
return tiplocCache.get(crs)!;
}
// Cache Miss
try {
const tiploc = await fetchTiplocFromApi(crs);
tiplocCache.set(crs, tiploc);
return tiploc;
} catch (err) {
log('ERROR', `Failed lookup for: ${crs}`, err);
process.exit(99);
}
}))
}
async function fetchTiplocFromApi(crs: string): Promise<string> {
const apiUrl = `${BASE_URL}/api/v2/ref/locationCode/crs/${crs}`;
const response = await fetch(apiUrl, {
headers: { 'Accept': 'application/json' },
});
if (!response.ok) {
throw new Error(`API Error: ${response.status}`);
}
const data = await response.json();
return data[0].TIPLOC;
}

View File

@@ -1,11 +1,11 @@
import { Readable } from 'node:stream';
import { log } from '../logger';
import { log } from '../logger.js';
const GITEA_TOKEN = process.env.GITEA_TOKEN as string || "";
const GITEA_SERVER = process.env.GITEA_SERVER as string || "";
const GITEA_OWNER = process.env.GITEA_OWNER as string || "";
const GITEA_REPO = process.env.GITEA_REPO as string || "";
const GITEA_PKG_URL = `https://${GITEA_SERVER}/${GITEA_OWNER}/${GITEA_REPO}`
const GITEA_PKG_URL = `https://${GITEA_SERVER}/api/v1/repos/${GITEA_OWNER}/${GITEA_REPO}`
const REQUIRED_ENV_VARS = [
"GITEA_SERVER",
@@ -36,7 +36,8 @@ if (missing) {
interface GiteaPackageAsset {
name: string;
id: string;
id: number;
browser_download_url: string;
}
export interface GiteaPackageResponse {
id: number;
@@ -47,9 +48,12 @@ export interface GiteaPackageResponse {
}
export async function getLatestPackageName() :Promise<GiteaPackageResponse> {
const get = await sendRequest<GiteaPackageResponse>(`${GITEA_PKG_URL}/releases/latest`);
const url = `${GITEA_PKG_URL}/releases/latest`
log("DEBUG", `Sending request to ${url}`)
const get = await sendRequest<GiteaPackageResponse>(url);
if (get.name) {
log("DEBUG", `Latest package name: ${get.name}`)
return get
} else {
log("ERROR", "Invalid Resposne from Gitea Server");
@@ -58,11 +62,7 @@ export async function getLatestPackageName() :Promise<GiteaPackageResponse> {
}
}
export async function getPackageAttachment(release_id: number, attachment_id: number) :Promise<any> {
return getRequestStream(`${GITEA_PKG_URL}/releases/${release_id}/assets/${attachment_id}`);
}
async function getRequestStream(url: string) :Promise<Readable> {
export async function getRequestStream(url: string) :Promise<Readable> {
const options: RequestInit = {
method: "GET",
headers: {