11 Commits

11 changed files with 494 additions and 107 deletions

138
.dockerignore Normal file
View File

@@ -0,0 +1,138 @@
# ---> Node
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional stylelint cache
.stylelintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variable files
.env
.env.development.local
.env.test.local
.env.production.local
.env.local
# parcel-bundler cache (https://parceljs.org/)
.cache
.parcel-cache
# Next.js build output
.next
out
# Nuxt.js build / generate output
.nuxt
dist
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# vuepress v2.x temp and cache directory
.temp
.cache
# vitepress build output
**/.vitepress/dist
# vitepress cache directory
**/.vitepress/cache
# Docusaurus cache and generated files
.docusaurus
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# yarn v2
.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*

View File

@@ -0,0 +1,39 @@
name: Build and Push container image
run-name: ${{ gitea.actor }} is building and pushing
on:
create:
tags: "*"
env:
GITEA_DOMAIN: git.fjla.uk
GITEA_REGISTRY_USER: owlbot
RESULT_IMAGE_NAME: owlboard/data-ingress-pis
jobs:
build-and-push-image:
runs-on: ubuntu-latest
container:
image: catthehacker/ubuntu:act-latest
options: --privileged
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Gitea Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.GITEA_DOMAIN }}
username: ${{ env.GITEA_REGISTRY_USER }}
password: ${{ secrets.REGISTRY_TOKEN }}
- name: Build and Push image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile
push: true
tags: |
${{ env.GITEA_DOMAIN }}/${{ env.RESULT_IMAGE_NAME }}:${{ gitea.ref_name }}
${{ env.GITEA_DOMAIN }}/${{ env.RESULT_IMAGE_NAME }}:latest

27
Dockerfile Normal file
View File

@@ -0,0 +1,27 @@
FROM node:25-slim AS builder
WORKDIR /app
COPY .npmrc ./
COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
FROM node:25-slim AS runner
ENV NODE_ENV=production
WORKDIR /app
COPY package*.json ./
RUN npm ci --omit=dev
COPY --from=builder /app/dist /app/dist
USER node
CMD ["node", "dist/index.js"]

8
package-lock.json generated
View File

@@ -11,7 +11,7 @@
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.964.0", "@aws-sdk/client-s3": "^3.964.0",
"@aws-sdk/lib-storage": "^3.964.0", "@aws-sdk/lib-storage": "^3.964.0",
"@owlboard/backend-data-contracts": "^0.1.0", "@owlboard/backend-data-contracts": "^0.1.9",
"mongodb": "^7.0.0", "mongodb": "^7.0.0",
"nats": "^2.29.3", "nats": "^2.29.3",
"readline": "^1.3.0", "readline": "^1.3.0",
@@ -1395,9 +1395,9 @@
} }
}, },
"node_modules/@owlboard/backend-data-contracts": { "node_modules/@owlboard/backend-data-contracts": {
"version": "0.1.0", "version": "0.1.9",
"resolved": "https://git.fjla.uk/api/packages/OwlBoard/npm/%40owlboard%2Fbackend-data-contracts/-/0.1.0/backend-data-contracts-0.1.0.tgz", "resolved": "https://git.fjla.uk/api/packages/OwlBoard/npm/%40owlboard%2Fbackend-data-contracts/-/0.1.9/backend-data-contracts-0.1.9.tgz",
"integrity": "sha512-6cJg7l7i8+iogU5nzvXct1+MQLmqgwcC3I7aS8MAHiDE3ymnLBhpaFKNfz6zkG39KQbA0XGhtoQD1dlKZzLXfw==", "integrity": "sha512-sKIogfclUKSwIvcPFQmJfpVMGLR1XbqMhyL1qNlKFWqWqATCrgKdboArwj5AfTjEC8hDUp8x0ffaRL+hQphXoQ==",
"license": "ISC" "license": "ISC"
}, },
"node_modules/@smithy/abort-controller": { "node_modules/@smithy/abort-controller": {

View File

@@ -5,6 +5,9 @@
"main": "index.js", "main": "index.js",
"type": "module", "type": "module",
"scripts": { "scripts": {
"prebuild": "rm -rf ./dist",
"build": "tsc",
"start": "node dist/index.js",
"test": "echo \"Error: no test specified\" && exit 1" "test": "echo \"Error: no test specified\" && exit 1"
}, },
"repository": { "repository": {
@@ -21,7 +24,7 @@
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.964.0", "@aws-sdk/client-s3": "^3.964.0",
"@aws-sdk/lib-storage": "^3.964.0", "@aws-sdk/lib-storage": "^3.964.0",
"@owlboard/backend-data-contracts": "^0.1.0", "@owlboard/backend-data-contracts": "^0.1.9",
"mongodb": "^7.0.0", "mongodb": "^7.0.0",
"nats": "^2.29.3", "nats": "^2.29.3",
"readline": "^1.3.0", "readline": "^1.3.0",

79
src/config.ts Normal file
View File

@@ -0,0 +1,79 @@
import { readFileSync, existsSync } from "node:fs"
import { join } from "node:path"
import { log } from "./logger.js";
interface Config {
Mq: Mq,
S3: S3,
Mongo: Mongo,
General: GeneralConfig,
}
export interface Mq {
MQ_USER: string;
MQ_PASS: string;
MQ_URL: string;
MQ_TOPIC: string;
}
export interface S3 {
S3_ENDPOINT: string;
S3_BUCKET: string;
S3_ACCESS_KEY: string;
S3_SECRET_KEY: string;
S3_REGION: string;
}
export interface Mongo {
MONGO_URI: string;
MONGO_DB: string;
MONGO_USER: string;
MONGO_PASS: string;
}
export interface GeneralConfig {
TOC: string;
}
export function ConfigLoader(): Config {
const cfg: Config = {
Mq: {
MQ_URL: getConfig("MQ_URL"),
MQ_TOPIC: getConfig("MQ_TOPIC"),
MQ_USER: getConfig("MQ_USER"),
MQ_PASS: getConfig("MQ_PASS"),
},
S3: {
S3_ENDPOINT: getConfig("S3_ENDPOINT"),
S3_BUCKET: getConfig("S3_BUCKET"),
S3_ACCESS_KEY: getConfig("S3_ACCESS_KEY"),
S3_SECRET_KEY: getConfig("S3_SECRET_KEY"),
S3_REGION: getConfig("S3_REGION"),
},
Mongo: {
MONGO_URI: getConfig("MONGO_URI"),
MONGO_DB: getConfig("MONGO_DB"),
MONGO_USER: getConfig("MONGO_USER"),
MONGO_PASS: getConfig("MONGO_PASS"),
},
General: {
TOC: getConfig("TOC"),
}
};
return cfg
}
function getConfig(key: string): string {
const filePath = join("/etc/secrets", key);
if (existsSync(filePath)) {
try {
return readFileSync(filePath, "utf8").trim();
} catch (err) {
log("ERROR", `Error reading secret file at ${filePath}: ${err}`);
}
}
return process.env[key] || ""
}

View File

@@ -1,62 +0,0 @@
// Functions to check the current applied version
// updating will be handled by the processing service
import { MongoClient } from "mongodb";
import { log } from "./logger";
const uri = process.env.MONGO_URI || "";
const db = process.env.MONGO_DB || "";
const user = process.env.MONGO_USER || "";
const pass = process.env.MONGO_PASS || "";
const collection = "data_ingress_meta";
if(!uri || !db || !user || !pass) {
log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35");
process.exit(35);
} else {
log("DEBUG", `MongoDB Connection`, {
uri: uri,
db: db,
collection: collection,
user: user,
pass: "****",
});
};
const CONNECTION_URI = `mongodb://${encodeURIComponent(user)}:${encodeURIComponent(pass)}@${uri}`;
let mongoClient: MongoClient | null = null;
async function getMongoClient() {
if (mongoClient) return mongoClient;
mongoClient = new MongoClient(CONNECTION_URI);
await mongoClient.connect();
return mongoClient;
}
export async function isPackageProcessed(serviceName: string, packageName: string): Promise<boolean> {
try {
const client = await getMongoClient();
const database = client.db(db);
const coll = database.collection(collection);
const result = await coll.findOne({ service_name: serviceName });
if (!result) {
log('INFO', `No metadata found for ${serviceName}. Fetching PIS Data`);
return false;
}
if (result.latest_package === packageName) {
log('INFO', 'No update needed');
return true;
}
log('INFO', `Version mismatch. DB: ${result.latest_package}, Current: ${packageName}. Update required`)
return false;
} catch (err) {
log('ERROR', 'Failed to check Mongo for version state:', err);
process.exit(1);
}
}

View File

@@ -4,13 +4,39 @@ import { processAndStore } from './sss.js'
import { getLatestPackageName, getRequestStream } from './sources/gitea.js' import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
import { processPisStream } from './process.js' import { processPisStream } from './process.js'
import { isPackageProcessed } from './database.js' import { ConfigLoader } from './config.js'
import { natsManager } from './nats.js'
async function exit(exitCode: string | number=0): Promise<void> {
log("INFO", `Exiting with code: ${exitCode}`);
try {
await natsManager.close();
} catch (err) {
log("ERROR", `Error during cleanup: ${err}`);
process.exit(1)
}
process.exit(exitCode);
}
async function main() { async function main() {
const SERVICE_NAME = process.env.SERVICE_NAME; const SERVICE_NAME = process.env.SERVICE_NAME;
if (!SERVICE_NAME) { if (!SERVICE_NAME) {
log('ERROR', "SERVICE_NAME env variable must be set"); log('ERROR', "SERVICE_NAME env variable must be set");
process.exit(1); process.exitCode = 1;
return;
}
const CURRENT_VERSION_KEY: string = `${SERVICE_NAME}-current-version`;
const config = ConfigLoader();
try {
log('INFO', `Initialising NATS`);
await natsManager.connect(config.Mq);
} catch (err) {
log('ERROR', `Unable to connect to NATS: ${err}`);
} }
try { try {
@@ -19,26 +45,34 @@ async function main() {
if (!packageInfo.assets[0]?.browser_download_url) { if (!packageInfo.assets[0]?.browser_download_url) {
log('ERROR', `No attachments found for release ${packageInfo.name}`); log('ERROR', `No attachments found for release ${packageInfo.name}`);
process.exit(9); process.exitCode = 1;
return;
} }
if (await isPackageProcessed(SERVICE_NAME, packageInfo.name)) { const lastAppliedVersion: string | null = await natsManager.getState(CURRENT_VERSION_KEY);
log('INFO', `Database matches latest release. Exiting`); if (lastAppliedVersion === packageInfo.name) {
process.exit(0); log('INFO', "No new data, exiting");
return;
} }
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url); const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
const objectGenerator = processPisStream(inputStream); const objectGenerator = processPisStream(config.General, inputStream);
const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`; const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`;
log('DEBUG', `Processing stream to: ${filename}`); log('DEBUG', `Processing stream to: ${filename}`);
await processAndStore(objectGenerator, filename); await processAndStore(config.S3, objectGenerator, filename);
log('DEBUG', 'Done'); log('DEBUG', 'Done');
log('DEBUG', "Sending message to NATS");
await natsManager.sendFileUpdateMessage(filename, packageInfo.name, SERVICE_NAME);
await natsManager.setState(CURRENT_VERSION_KEY, packageInfo.name);
} catch (err) { } catch (err) {
log('ERROR', 'Fatal error in pipeline: ', err); log('ERROR', 'Fatal error in pipeline: ', err);
process.exit(7); process.exitCode = 1;
} finally {
await exit(process.exitCode || 0);
} }
} }

View File

@@ -1,26 +1,136 @@
import { connect, JSONCodec } from "nats"; import { connect, JSONCodec, StringCodec } from "nats";
import type { ConnectionOptions, NatsConnection, Payload } from "nats"; import type { ConnectionOptions, NatsConnection, JetStreamClient, KV } from "nats";
import { log } from "./logger"; import { log } from "./logger.js";
import { hostname } from "node:os";
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
import type { Mq } from "./config.js";
const jc = JSONCodec(); const jc = JSONCodec();
const sc = StringCodec();
async function getNatsConnection(): Promise<NatsConnection> { class NatsManager {
const serverUrl = process.env.MQ_URL || "nats://localhost:4222"; private nc: NatsConnection | null = null;
private js: JetStreamClient | null = null;
private kv: KV | null = null;
private bucketName = "INGRESS_STATES";
const options: ConnectionOptions = { /**
servers: serverUrl, * Opens connection to NATS
name: `${process.env.HOSTNAME}` || 'local', */
reconnect: true, async connect(cfg: Mq): Promise<void> {
maxReconnectAttempts: -1, if (this.nc) return;
};
if (process.env.MQ_USER && process.env.MQ_PASS) { const options: ConnectionOptions = {
options.user = process.env.MQ_USER; servers: cfg.MQ_URL,
options.pass = process.env.MQ_PASS; name: hostname(),
log("INFO", "NATS: Using username/password authentication"); reconnect: true,
} else { maxReconnectAttempts: -1,
log("INFO", "NATS: Connecting without authentication"); waitOnFirstConnect: true,
};
if (cfg.MQ_USER && cfg.MQ_PASS) {
options.user = cfg.MQ_USER;
options.pass = cfg.MQ_PASS;
log("INFO", "NATS: Using auth credentials");
}
try {
this.nc = await connect(options);
this.js = this.nc.jetstream();
log("INFO", `NATS: Connected to ${cfg.MQ_URL}`);
// Handle connection close events
this.nc.closed().then((err) => {
if (err) {
log("ERROR", `NATS: Connection closed: ${err}`);
} else {
log("INFO", "NATS: Connection ended")
}
this.nc = null;
this.js = null;
this.kv = null;
});
} catch (err) {
log("ERROR", `NATS: Initial connection failed: ${err}`);
throw err;
}
} }
return await connect(options) /**
} * Accessor for the KV store
*/
private async getKV(): Promise<KV> {
if (!this.js) throw new Error("NATS: JetStream not initialized. Call connect() first.");
if (!this.kv) {
this.kv = await this.js.views.kv(this.bucketName);
}
return this.kv;
}
/**
* Get the last recorded state/hash for a service
*/
async getState(key: string): Promise<string | null> {
try {
const store = await this.getKV();
const entry = await store.get(key);
return entry ? sc.decode(entry.value) : null;
} catch (err) {
log("ERROR", `NATS: KV Get failed for ${key}: ${err}`);
return null;
}
}
/**
* Set the current state/hash for a service
*/
async setState(key: string, value: string): Promise<void> {
try {
const store = await this.getKV();
await store.put(key, sc.encode(value));
} catch (err) {
log("ERROR", `NATS: KV Set failed for ${key}: ${err}`);
throw err;
}
}
/**
* Publishes message to the JetStream
*/
async sendFileUpdateMessage(path: string, version: string, serviceName: string): Promise<boolean> {
if (!this.js) throw new Error("NATS: JetStream not initialized");
const message: MQFileUpdate = {
service_name: serviceName,
service_id: hostname(),
sent_timestamp: Math.floor(Date.now() / 1000),
data_type: "file",
data_kind: "pis",
payload: {
version: version,
filepath: path,
}
};
const subject = `ingress.file.${message.data_kind}`;
try {
await this.js.publish(subject, jc.encode(message));
log("INFO", `NATS: Message published to ${subject}`);
return true;
} catch (err) {
log("ERROR", `NATS: Failed to publish to JetStream: ${err}`);
return false;
}
}
async close() {
if (this.nc) {
await this.nc.drain();
this.nc = null;
}
}
}
// Export instance
export const natsManager = new NatsManager();

View File

@@ -3,6 +3,7 @@ import { createInterface } from 'node:readline';
import XXH from 'xxhashjs'; import XXH from 'xxhashjs';
import { log } from './logger.js'; import { log } from './logger.js';
import { DataIngressPisData } from '@owlboard/backend-data-contracts'; import { DataIngressPisData } from '@owlboard/backend-data-contracts';
import type { GeneralConfig } from './config.js';
const BASE_URL = process.env.BASEURL || 'https://owlboard.info' const BASE_URL = process.env.BASEURL || 'https://owlboard.info'
@@ -18,8 +19,8 @@ interface InputRecord {
stops: string[]; stops: string[];
} }
export async function* processPisStream(inputStream: Readable) { export async function* processPisStream(cfg: GeneralConfig, inputStream: Readable) {
const TOC = process.env.TOC; const TOC = cfg.TOC;
if (!TOC) { if (!TOC) {
log('ERROR', "TOC not set: Exit code 19"); log('ERROR', "TOC not set: Exit code 19");
process.exit(19); process.exit(19);
@@ -35,7 +36,7 @@ export async function* processPisStream(inputStream: Readable) {
if (!line.trim()) continue; if (!line.trim()) continue;
const record = JSON.parse(line) as InputRecord; const record = JSON.parse(line) as InputRecord;
log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`) // log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
const crsHash = XXH.h64(record.stops.join('|'), SEED); const crsHash = XXH.h64(record.stops.join('|'), SEED);
const tiplocStops = await mapStopsToTiploc(record.stops); const tiplocStops = await mapStopsToTiploc(record.stops);

View File

@@ -1,11 +1,13 @@
import { S3Client } from "@aws-sdk/client-s3"; import { S3Client, CreateBucketCommand, HeadBucketCommand, S3ServiceException } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage"; import { Upload } from "@aws-sdk/lib-storage";
import { createWriteStream } from "node:fs"; import { createWriteStream } from "node:fs";
import { Readable } from "node:stream"; import { Readable } from "node:stream";
import { DataIngressPisData } from "@owlboard/backend-data-contracts"; import { DataIngressPisData } from "@owlboard/backend-data-contracts";
import { log } from "./logger.js"; import { log } from "./logger.js";
import type { S3 } from "./config.js";
export async function processAndStore( export async function processAndStore(
cfg: S3,
generator: AsyncGenerator<DataIngressPisData.PisObjects>, generator: AsyncGenerator<DataIngressPisData.PisObjects>,
filename: string filename: string
) { ) {
@@ -15,29 +17,45 @@ export async function processAndStore(
} }
})()); })());
const useS3 = process.env.S3_ENDPOINT && process.env.S3_BUCKET; const useS3 = cfg.S3_ENDPOINT && cfg.S3_BUCKET;
if (useS3) { if (useS3) {
if (!process.env.S3_ENDPOINT || process.env.S3_BUCKET || !process.env.S3_REGION || !process.env.S3_ACCESS_KEY || !process.env.S3_SECRET_KEY) { if (!cfg.S3_ENDPOINT || !cfg.S3_BUCKET || !cfg.S3_REGION || !cfg.S3_ACCESS_KEY || !cfg.S3_SECRET_KEY) {
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24"); log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
process.exit(24); process.exit(24);
} }
log('INFO', `Streaming to S3: ${process.env.S3_BUCKET}/${filename}`);
log("DEBUG", "Opening connection to S3 Server");
const client = new S3Client({ const client = new S3Client({
endpoint: process.env.S3_ENDPOINT, endpoint: cfg.S3_ENDPOINT,
region: process.env.S3_REGION, region: cfg.S3_REGION,
credentials: { credentials: {
accessKeyId: process.env.S3_ACCESS_KEY!, accessKeyId: cfg.S3_ACCESS_KEY!,
secretAccessKey: process.env.S3_SECRET_KEY, secretAccessKey: cfg.S3_SECRET_KEY,
}, },
forcePathStyle: true, forcePathStyle: true,
}); });
// Check bucket exists, and create if needed
try {
await client.send(new HeadBucketCommand({ Bucket: cfg.S3_BUCKET }));
} catch (err) {
if (err instanceof S3ServiceException && err.$metadata.httpStatusCode === 404) {
log('INFO', `Bucket ${cfg.S3_BUCKET} does not exist, creating...`);
await client.send(new CreateBucketCommand({ Bucket: cfg.S3_BUCKET }));
} else {
log(`ERROR`, `Failed to verify bucket: ${cfg.S3_BUCKET}`);
process.exit(21);
}
}
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
const upload = new Upload({ const upload = new Upload({
client, client,
params: { params: {
Bucket: process.env.S3_BUCKET, Bucket: cfg.S3_BUCKET,
Key: filename, Key: filename,
Body: ndjsonStream, Body: ndjsonStream,
ContentType: "application/x-ndjson", ContentType: "application/x-ndjson",