Compare commits
12 Commits
8f522046aa
...
0.0.9
| Author | SHA1 | Date | |
|---|---|---|---|
| a2d6ee7400 | |||
| 7903344eea | |||
| 0c934a748f | |||
| 33bd1cd320 | |||
| e115edd79e | |||
| 1f07951b88 | |||
| b0a45f4000 | |||
| b42e37c569 | |||
| 2cb9c320cf | |||
| c90163cdce | |||
| cb37774a3a | |||
| 45350e7a2b |
138
.dockerignore
Normal file
138
.dockerignore
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
# ---> Node
|
||||||
|
# Logs
|
||||||
|
logs
|
||||||
|
*.log
|
||||||
|
npm-debug.log*
|
||||||
|
yarn-debug.log*
|
||||||
|
yarn-error.log*
|
||||||
|
lerna-debug.log*
|
||||||
|
.pnpm-debug.log*
|
||||||
|
|
||||||
|
# Diagnostic reports (https://nodejs.org/api/report.html)
|
||||||
|
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
|
||||||
|
|
||||||
|
# Runtime data
|
||||||
|
pids
|
||||||
|
*.pid
|
||||||
|
*.seed
|
||||||
|
*.pid.lock
|
||||||
|
|
||||||
|
# Directory for instrumented libs generated by jscoverage/JSCover
|
||||||
|
lib-cov
|
||||||
|
|
||||||
|
# Coverage directory used by tools like istanbul
|
||||||
|
coverage
|
||||||
|
*.lcov
|
||||||
|
|
||||||
|
# nyc test coverage
|
||||||
|
.nyc_output
|
||||||
|
|
||||||
|
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
|
||||||
|
.grunt
|
||||||
|
|
||||||
|
# Bower dependency directory (https://bower.io/)
|
||||||
|
bower_components
|
||||||
|
|
||||||
|
# node-waf configuration
|
||||||
|
.lock-wscript
|
||||||
|
|
||||||
|
# Compiled binary addons (https://nodejs.org/api/addons.html)
|
||||||
|
build/Release
|
||||||
|
|
||||||
|
# Dependency directories
|
||||||
|
node_modules/
|
||||||
|
jspm_packages/
|
||||||
|
|
||||||
|
# Snowpack dependency directory (https://snowpack.dev/)
|
||||||
|
web_modules/
|
||||||
|
|
||||||
|
# TypeScript cache
|
||||||
|
*.tsbuildinfo
|
||||||
|
|
||||||
|
# Optional npm cache directory
|
||||||
|
.npm
|
||||||
|
|
||||||
|
# Optional eslint cache
|
||||||
|
.eslintcache
|
||||||
|
|
||||||
|
# Optional stylelint cache
|
||||||
|
.stylelintcache
|
||||||
|
|
||||||
|
# Microbundle cache
|
||||||
|
.rpt2_cache/
|
||||||
|
.rts2_cache_cjs/
|
||||||
|
.rts2_cache_es/
|
||||||
|
.rts2_cache_umd/
|
||||||
|
|
||||||
|
# Optional REPL history
|
||||||
|
.node_repl_history
|
||||||
|
|
||||||
|
# Output of 'npm pack'
|
||||||
|
*.tgz
|
||||||
|
|
||||||
|
# Yarn Integrity file
|
||||||
|
.yarn-integrity
|
||||||
|
|
||||||
|
# dotenv environment variable files
|
||||||
|
.env
|
||||||
|
.env.development.local
|
||||||
|
.env.test.local
|
||||||
|
.env.production.local
|
||||||
|
.env.local
|
||||||
|
|
||||||
|
# parcel-bundler cache (https://parceljs.org/)
|
||||||
|
.cache
|
||||||
|
.parcel-cache
|
||||||
|
|
||||||
|
# Next.js build output
|
||||||
|
.next
|
||||||
|
out
|
||||||
|
|
||||||
|
# Nuxt.js build / generate output
|
||||||
|
.nuxt
|
||||||
|
dist
|
||||||
|
|
||||||
|
# Gatsby files
|
||||||
|
.cache/
|
||||||
|
# Comment in the public line in if your project uses Gatsby and not Next.js
|
||||||
|
# https://nextjs.org/blog/next-9-1#public-directory-support
|
||||||
|
# public
|
||||||
|
|
||||||
|
# vuepress build output
|
||||||
|
.vuepress/dist
|
||||||
|
|
||||||
|
# vuepress v2.x temp and cache directory
|
||||||
|
.temp
|
||||||
|
.cache
|
||||||
|
|
||||||
|
# vitepress build output
|
||||||
|
**/.vitepress/dist
|
||||||
|
|
||||||
|
# vitepress cache directory
|
||||||
|
**/.vitepress/cache
|
||||||
|
|
||||||
|
# Docusaurus cache and generated files
|
||||||
|
.docusaurus
|
||||||
|
|
||||||
|
# Serverless directories
|
||||||
|
.serverless/
|
||||||
|
|
||||||
|
# FuseBox cache
|
||||||
|
.fusebox/
|
||||||
|
|
||||||
|
# DynamoDB Local files
|
||||||
|
.dynamodb/
|
||||||
|
|
||||||
|
# TernJS port file
|
||||||
|
.tern-port
|
||||||
|
|
||||||
|
# Stores VSCode versions used for testing VSCode extensions
|
||||||
|
.vscode-test
|
||||||
|
|
||||||
|
# yarn v2
|
||||||
|
.yarn/cache
|
||||||
|
.yarn/unplugged
|
||||||
|
.yarn/build-state.yml
|
||||||
|
.yarn/install-state.gz
|
||||||
|
.pnp.*
|
||||||
|
|
||||||
39
.gitea/workflows/build-push.yaml
Normal file
39
.gitea/workflows/build-push.yaml
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
name: Build and Push container image
|
||||||
|
run-name: ${{ gitea.actor }} is building and pushing
|
||||||
|
|
||||||
|
on:
|
||||||
|
create:
|
||||||
|
tags: "*"
|
||||||
|
|
||||||
|
env:
|
||||||
|
GITEA_DOMAIN: git.fjla.uk
|
||||||
|
GITEA_REGISTRY_USER: owlbot
|
||||||
|
RESULT_IMAGE_NAME: owlboard/data-ingress-pis
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build-and-push-image:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
container:
|
||||||
|
image: catthehacker/ubuntu:act-latest
|
||||||
|
options: --privileged
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
|
- name: Login to Gitea Container Registry
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
registry: ${{ env.GITEA_DOMAIN }}
|
||||||
|
username: ${{ env.GITEA_REGISTRY_USER }}
|
||||||
|
password: ${{ secrets.REGISTRY_TOKEN }}
|
||||||
|
- name: Build and Push image
|
||||||
|
uses: docker/build-push-action@v6
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
file: ./Dockerfile
|
||||||
|
push: true
|
||||||
|
tags: |
|
||||||
|
${{ env.GITEA_DOMAIN }}/${{ env.RESULT_IMAGE_NAME }}:${{ gitea.ref_name }}
|
||||||
|
${{ env.GITEA_DOMAIN }}/${{ env.RESULT_IMAGE_NAME }}:latest
|
||||||
27
Dockerfile
Normal file
27
Dockerfile
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
FROM node:25-slim AS builder
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY .npmrc ./
|
||||||
|
COPY package*.json ./
|
||||||
|
RUN npm ci
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
RUN npm run build
|
||||||
|
|
||||||
|
|
||||||
|
FROM node:25-slim AS runner
|
||||||
|
|
||||||
|
ENV NODE_ENV=production
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY package*.json ./
|
||||||
|
RUN npm ci --omit=dev
|
||||||
|
|
||||||
|
COPY --from=builder /app/dist /app/dist
|
||||||
|
|
||||||
|
USER node
|
||||||
|
|
||||||
|
CMD ["node", "dist/index.js"]
|
||||||
1438
package-lock.json
generated
1438
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
10
package.json
10
package.json
@@ -5,6 +5,9 @@
|
|||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
"prebuild": "rm -rf ./dist",
|
||||||
|
"build": "tsc",
|
||||||
|
"start": "node dist/index.js",
|
||||||
"test": "echo \"Error: no test specified\" && exit 1"
|
"test": "echo \"Error: no test specified\" && exit 1"
|
||||||
},
|
},
|
||||||
"repository": {
|
"repository": {
|
||||||
@@ -21,15 +24,12 @@
|
|||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@aws-sdk/client-s3": "^3.964.0",
|
"@aws-sdk/client-s3": "^3.964.0",
|
||||||
"@aws-sdk/lib-storage": "^3.964.0",
|
"@aws-sdk/lib-storage": "^3.964.0",
|
||||||
"@owlboard/backend-data-contracts": "^0.1.0",
|
"@owlboard/backend-data-contracts": "^0.1.9",
|
||||||
"mongodb": "^7.0.0",
|
|
||||||
"nats": "^2.29.3",
|
"nats": "^2.29.3",
|
||||||
"readline": "^1.3.0",
|
"readline": "^1.3.0"
|
||||||
"xxhashjs": "^0.2.2"
|
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@types/node": "^25.0.3",
|
"@types/node": "^25.0.3",
|
||||||
"@types/xxhashjs": "^0.2.4",
|
|
||||||
"tsx": "^4.21.0",
|
"tsx": "^4.21.0",
|
||||||
"typescript": "^5.9.3"
|
"typescript": "^5.9.3"
|
||||||
}
|
}
|
||||||
|
|||||||
79
src/config.ts
Normal file
79
src/config.ts
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
import { readFileSync, existsSync } from "node:fs"
|
||||||
|
import { join } from "node:path"
|
||||||
|
import { log } from "./logger.js";
|
||||||
|
|
||||||
|
interface Config {
|
||||||
|
Mq: Mq,
|
||||||
|
S3: S3,
|
||||||
|
Mongo: Mongo,
|
||||||
|
General: GeneralConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Mq {
|
||||||
|
MQ_USER: string;
|
||||||
|
MQ_PASS: string;
|
||||||
|
MQ_URL: string;
|
||||||
|
MQ_TOPIC: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface S3 {
|
||||||
|
S3_ENDPOINT: string;
|
||||||
|
S3_BUCKET: string;
|
||||||
|
S3_ACCESS_KEY: string;
|
||||||
|
S3_SECRET_KEY: string;
|
||||||
|
S3_REGION: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface Mongo {
|
||||||
|
MONGO_URI: string;
|
||||||
|
MONGO_DB: string;
|
||||||
|
MONGO_USER: string;
|
||||||
|
MONGO_PASS: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface GeneralConfig {
|
||||||
|
TOC: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function ConfigLoader(): Config {
|
||||||
|
const cfg: Config = {
|
||||||
|
Mq: {
|
||||||
|
MQ_URL: getConfig("MQ_URL"),
|
||||||
|
MQ_TOPIC: getConfig("MQ_TOPIC"),
|
||||||
|
MQ_USER: getConfig("MQ_USER"),
|
||||||
|
MQ_PASS: getConfig("MQ_PASS"),
|
||||||
|
},
|
||||||
|
S3: {
|
||||||
|
S3_ENDPOINT: getConfig("S3_ENDPOINT"),
|
||||||
|
S3_BUCKET: getConfig("S3_BUCKET"),
|
||||||
|
S3_ACCESS_KEY: getConfig("S3_ACCESS_KEY"),
|
||||||
|
S3_SECRET_KEY: getConfig("S3_SECRET_KEY"),
|
||||||
|
S3_REGION: getConfig("S3_REGION"),
|
||||||
|
},
|
||||||
|
Mongo: {
|
||||||
|
MONGO_URI: getConfig("MONGO_URI"),
|
||||||
|
MONGO_DB: getConfig("MONGO_DB"),
|
||||||
|
MONGO_USER: getConfig("MONGO_USER"),
|
||||||
|
MONGO_PASS: getConfig("MONGO_PASS"),
|
||||||
|
},
|
||||||
|
General: {
|
||||||
|
TOC: getConfig("TOC"),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
function getConfig(key: string): string {
|
||||||
|
const filePath = join("/etc/secrets", key);
|
||||||
|
if (existsSync(filePath)) {
|
||||||
|
try {
|
||||||
|
return readFileSync(filePath, "utf8").trim();
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `Error reading secret file at ${filePath}: ${err}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return process.env[key] || ""
|
||||||
|
}
|
||||||
|
|
||||||
@@ -1,62 +0,0 @@
|
|||||||
// Functions to check the current applied version
|
|
||||||
// updating will be handled by the processing service
|
|
||||||
|
|
||||||
import { MongoClient } from "mongodb";
|
|
||||||
import { log } from "./logger";
|
|
||||||
|
|
||||||
const uri = process.env.MONGO_URI || "";
|
|
||||||
const db = process.env.MONGO_DB || "";
|
|
||||||
const user = process.env.MONGO_USER || "";
|
|
||||||
const pass = process.env.MONGO_PASS || "";
|
|
||||||
const collection = "data_ingress_meta";
|
|
||||||
|
|
||||||
if(!uri || !db || !user || !pass) {
|
|
||||||
log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35");
|
|
||||||
process.exit(35);
|
|
||||||
} else {
|
|
||||||
log("DEBUG", `MongoDB Connection`, {
|
|
||||||
uri: uri,
|
|
||||||
db: db,
|
|
||||||
collection: collection,
|
|
||||||
user: user,
|
|
||||||
pass: "****",
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
const CONNECTION_URI = `mongodb://${encodeURIComponent(user)}:${encodeURIComponent(pass)}@${uri}`;
|
|
||||||
|
|
||||||
let mongoClient: MongoClient | null = null;
|
|
||||||
|
|
||||||
async function getMongoClient() {
|
|
||||||
if (mongoClient) return mongoClient;
|
|
||||||
|
|
||||||
mongoClient = new MongoClient(CONNECTION_URI);
|
|
||||||
await mongoClient.connect();
|
|
||||||
return mongoClient;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function isPackageProcessed(serviceName: string, packageName: string): Promise<boolean> {
|
|
||||||
try {
|
|
||||||
const client = await getMongoClient();
|
|
||||||
const database = client.db(db);
|
|
||||||
const coll = database.collection(collection);
|
|
||||||
|
|
||||||
const result = await coll.findOne({ service_name: serviceName });
|
|
||||||
|
|
||||||
if (!result) {
|
|
||||||
log('INFO', `No metadata found for ${serviceName}. Fetching PIS Data`);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result.latest_package === packageName) {
|
|
||||||
log('INFO', 'No update needed');
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
log('INFO', `Version mismatch. DB: ${result.latest_package}, Current: ${packageName}. Update required`)
|
|
||||||
return false;
|
|
||||||
} catch (err) {
|
|
||||||
log('ERROR', 'Failed to check Mongo for version state:', err);
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
52
src/index.ts
52
src/index.ts
@@ -4,13 +4,39 @@ import { processAndStore } from './sss.js'
|
|||||||
|
|
||||||
import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
|
import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
|
||||||
import { processPisStream } from './process.js'
|
import { processPisStream } from './process.js'
|
||||||
import { isPackageProcessed } from './database.js'
|
import { ConfigLoader } from './config.js'
|
||||||
|
import { natsManager } from './nats.js'
|
||||||
|
|
||||||
|
async function exit(exitCode: string | number=0): Promise<void> {
|
||||||
|
log("INFO", `Exiting with code: ${exitCode}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await natsManager.close();
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `Error during cleanup: ${err}`);
|
||||||
|
process.exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
process.exit(exitCode);
|
||||||
|
}
|
||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
const SERVICE_NAME = process.env.SERVICE_NAME;
|
const SERVICE_NAME = process.env.SERVICE_NAME;
|
||||||
if (!SERVICE_NAME) {
|
if (!SERVICE_NAME) {
|
||||||
log('ERROR', "SERVICE_NAME env variable must be set");
|
log('ERROR', "SERVICE_NAME env variable must be set");
|
||||||
process.exit(1);
|
process.exitCode = 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const CURRENT_VERSION_KEY: string = `${SERVICE_NAME}-current-version`;
|
||||||
|
|
||||||
|
const config = ConfigLoader();
|
||||||
|
|
||||||
|
try {
|
||||||
|
log('INFO', `Initialising NATS`);
|
||||||
|
await natsManager.connect(config.Mq);
|
||||||
|
} catch (err) {
|
||||||
|
log('ERROR', `Unable to connect to NATS: ${err}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -19,26 +45,34 @@ async function main() {
|
|||||||
|
|
||||||
if (!packageInfo.assets[0]?.browser_download_url) {
|
if (!packageInfo.assets[0]?.browser_download_url) {
|
||||||
log('ERROR', `No attachments found for release ${packageInfo.name}`);
|
log('ERROR', `No attachments found for release ${packageInfo.name}`);
|
||||||
process.exit(9);
|
process.exitCode = 1;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (await isPackageProcessed(SERVICE_NAME, packageInfo.name)) {
|
const lastAppliedVersion: string | null = await natsManager.getState(CURRENT_VERSION_KEY);
|
||||||
log('INFO', `Database matches latest release. Exiting`);
|
if (lastAppliedVersion === packageInfo.name) {
|
||||||
process.exit(0);
|
log('INFO', "No new data, exiting");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
|
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
|
||||||
const objectGenerator = processPisStream(inputStream);
|
const objectGenerator = processPisStream(config.General, inputStream);
|
||||||
|
|
||||||
const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`;
|
const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`;
|
||||||
|
|
||||||
log('DEBUG', `Processing stream to: ${filename}`);
|
log('DEBUG', `Processing stream to: ${filename}`);
|
||||||
|
|
||||||
await processAndStore(objectGenerator, filename);
|
await processAndStore(config.S3, objectGenerator, filename);
|
||||||
log('DEBUG', 'Done');
|
log('DEBUG', 'Done');
|
||||||
|
|
||||||
|
log('DEBUG', "Sending message to NATS");
|
||||||
|
await natsManager.sendFileUpdateMessage(filename, packageInfo.name, SERVICE_NAME);
|
||||||
|
await natsManager.setState(CURRENT_VERSION_KEY, packageInfo.name);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log('ERROR', 'Fatal error in pipeline: ', err);
|
log('ERROR', 'Fatal error in pipeline: ', err);
|
||||||
process.exit(7);
|
process.exitCode = 1;
|
||||||
|
} finally {
|
||||||
|
await exit(process.exitCode || 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
146
src/nats.ts
146
src/nats.ts
@@ -1,28 +1,136 @@
|
|||||||
import { connect, JSONCodec } from "nats";
|
import { connect, JSONCodec, StringCodec } from "nats";
|
||||||
import type { ConnectionOptions, NatsConnection, Payload } from "nats";
|
import type { ConnectionOptions, NatsConnection, JetStreamClient, KV } from "nats";
|
||||||
import { log } from "./logger";
|
import { log } from "./logger.js";
|
||||||
|
import { hostname } from "node:os";
|
||||||
|
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
|
||||||
|
import type { Mq } from "./config.js";
|
||||||
|
|
||||||
const jc = JSONCodec();
|
const jc = JSONCodec();
|
||||||
|
const sc = StringCodec();
|
||||||
|
|
||||||
async function getNatsConnection(): Promise<NatsConnection> {
|
class NatsManager {
|
||||||
const serverUrl = process.env.MQ_URL || "nats://localhost:4222";
|
private nc: NatsConnection | null = null;
|
||||||
|
private js: JetStreamClient | null = null;
|
||||||
|
private kv: KV | null = null;
|
||||||
|
private bucketName = "INGRESS_STATES";
|
||||||
|
|
||||||
const options: ConnectionOptions = {
|
/**
|
||||||
servers: serverUrl,
|
* Opens connection to NATS
|
||||||
name: `${process.env.HOSTNAME}` || 'local',
|
*/
|
||||||
reconnect: true,
|
async connect(cfg: Mq): Promise<void> {
|
||||||
maxReconnectAttempts: -1,
|
if (this.nc) return;
|
||||||
};
|
|
||||||
|
|
||||||
if (process.env.MQ_USER && process.env.MQ_PASS) {
|
const options: ConnectionOptions = {
|
||||||
options.user = process.env.MQ_USER;
|
servers: cfg.MQ_URL,
|
||||||
options.pass = process.env.MQ_PASS;
|
name: hostname(),
|
||||||
log("INFO", "NATS: Using username/password authentication");
|
reconnect: true,
|
||||||
} else {
|
maxReconnectAttempts: -1,
|
||||||
log("INFO", "NATS: Connecting without authentication");
|
waitOnFirstConnect: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (cfg.MQ_USER && cfg.MQ_PASS) {
|
||||||
|
options.user = cfg.MQ_USER;
|
||||||
|
options.pass = cfg.MQ_PASS;
|
||||||
|
log("INFO", "NATS: Using auth credentials");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.nc = await connect(options);
|
||||||
|
this.js = this.nc.jetstream();
|
||||||
|
log("INFO", `NATS: Connected to ${cfg.MQ_URL}`);
|
||||||
|
|
||||||
|
// Handle connection close events
|
||||||
|
this.nc.closed().then((err) => {
|
||||||
|
if (err) {
|
||||||
|
log("ERROR", `NATS: Connection closed: ${err}`);
|
||||||
|
} else {
|
||||||
|
log("INFO", "NATS: Connection ended")
|
||||||
|
}
|
||||||
|
this.nc = null;
|
||||||
|
this.js = null;
|
||||||
|
this.kv = null;
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `NATS: Initial connection failed: ${err}`);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return await connect(options)
|
/**
|
||||||
|
* Accessor for the KV store
|
||||||
|
*/
|
||||||
|
private async getKV(): Promise<KV> {
|
||||||
|
if (!this.js) throw new Error("NATS: JetStream not initialized. Call connect() first.");
|
||||||
|
if (!this.kv) {
|
||||||
|
this.kv = await this.js.views.kv(this.bucketName);
|
||||||
|
}
|
||||||
|
return this.kv;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the last recorded state/hash for a service
|
||||||
|
*/
|
||||||
|
async getState(key: string): Promise<string | null> {
|
||||||
|
try {
|
||||||
|
const store = await this.getKV();
|
||||||
|
const entry = await store.get(key);
|
||||||
|
return entry ? sc.decode(entry.value) : null;
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `NATS: KV Get failed for ${key}: ${err}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the current state/hash for a service
|
||||||
|
*/
|
||||||
|
async setState(key: string, value: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
const store = await this.getKV();
|
||||||
|
await store.put(key, sc.encode(value));
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `NATS: KV Set failed for ${key}: ${err}`);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publishes message to the JetStream
|
||||||
|
*/
|
||||||
|
async sendFileUpdateMessage(path: string, version: string, serviceName: string): Promise<boolean> {
|
||||||
|
if (!this.js) throw new Error("NATS: JetStream not initialized");
|
||||||
|
|
||||||
|
const message: MQFileUpdate = {
|
||||||
|
service_name: serviceName,
|
||||||
|
service_id: hostname(),
|
||||||
|
sent_timestamp: Math.floor(Date.now() / 1000),
|
||||||
|
data_type: "file",
|
||||||
|
data_kind: "pis",
|
||||||
|
payload: {
|
||||||
|
version: version,
|
||||||
|
filepath: path,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const subject = `ingress.file.${message.data_kind}`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.js.publish(subject, jc.encode(message));
|
||||||
|
log("INFO", `NATS: Message published to ${subject}`);
|
||||||
|
return true;
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `NATS: Failed to publish to JetStream: ${err}`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async close() {
|
||||||
|
if (this.nc) {
|
||||||
|
await this.nc.drain();
|
||||||
|
this.nc = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send Message Function here to send the message to NATS
|
// Export instance
|
||||||
|
export const natsManager = new NatsManager();
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
import { Readable } from 'node:stream';
|
import { Readable } from 'node:stream';
|
||||||
import { createInterface } from 'node:readline';
|
import { createInterface } from 'node:readline';
|
||||||
import XXH from 'xxhashjs';
|
|
||||||
import { log } from './logger.js';
|
import { log } from './logger.js';
|
||||||
import { DataIngressPisData } from '@owlboard/backend-data-contracts';
|
import { DataIngressPisData } from '@owlboard/backend-data-contracts';
|
||||||
|
import type { GeneralConfig } from './config.js';
|
||||||
|
|
||||||
const BASE_URL = process.env.BASEURL || 'https://owlboard.info'
|
const BASE_URL = process.env.BASEURL || 'https://owlboard.info'
|
||||||
|
|
||||||
@@ -18,8 +18,8 @@ interface InputRecord {
|
|||||||
stops: string[];
|
stops: string[];
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function* processPisStream(inputStream: Readable) {
|
export async function* processPisStream(cfg: GeneralConfig, inputStream: Readable) {
|
||||||
const TOC = process.env.TOC;
|
const TOC = cfg.TOC;
|
||||||
if (!TOC) {
|
if (!TOC) {
|
||||||
log('ERROR', "TOC not set: Exit code 19");
|
log('ERROR', "TOC not set: Exit code 19");
|
||||||
process.exit(19);
|
process.exit(19);
|
||||||
@@ -35,19 +35,19 @@ export async function* processPisStream(inputStream: Readable) {
|
|||||||
if (!line.trim()) continue;
|
if (!line.trim()) continue;
|
||||||
|
|
||||||
const record = JSON.parse(line) as InputRecord;
|
const record = JSON.parse(line) as InputRecord;
|
||||||
log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
|
// log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
|
||||||
|
|
||||||
const crsHash = XXH.h64(record.stops.join('|'), SEED);
|
const normalisedStops = record.stops.map(stop => stop.toUpperCase());
|
||||||
const tiplocStops = await mapStopsToTiploc(record.stops);
|
|
||||||
const tiplocHash = XXH.h64(tiplocStops.join('|'), SEED);
|
const tiplocStops = await mapStopsToTiploc(normalisedStops);
|
||||||
|
|
||||||
const data: DataIngressPisData.PisObjects = {
|
const data: DataIngressPisData.PisObjects = {
|
||||||
code: record.code,
|
code: record.code,
|
||||||
toc: TOC.toLowerCase(),
|
toc: TOC.toLowerCase(),
|
||||||
crsStops: record.stops,
|
crsStops: normalisedStops,
|
||||||
crsHash: crsHash.toString(),
|
crsHash: "",
|
||||||
tiplocStops: tiplocStops,
|
tiplocStops: tiplocStops,
|
||||||
tiplocHash: tiplocHash.toString(),
|
tiplocHash: "",
|
||||||
}
|
}
|
||||||
yield data;
|
yield data;
|
||||||
|
|
||||||
@@ -63,8 +63,9 @@ async function mapStopsToTiploc(crsStops: string[]): Promise<string[]> {
|
|||||||
// Cache Miss
|
// Cache Miss
|
||||||
try {
|
try {
|
||||||
const tiploc = await fetchTiplocFromApi(crs);
|
const tiploc = await fetchTiplocFromApi(crs);
|
||||||
tiplocCache.set(crs, tiploc);
|
const normalisedTiploc = tiploc.toUpperCase();
|
||||||
return tiploc;
|
tiplocCache.set(crs, normalisedTiploc);
|
||||||
|
return normalisedTiploc;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log('ERROR', `Failed lookup for: ${crs}`, err);
|
log('ERROR', `Failed lookup for: ${crs}`, err);
|
||||||
process.exit(99);
|
process.exit(99);
|
||||||
|
|||||||
36
src/sss.ts
36
src/sss.ts
@@ -1,11 +1,13 @@
|
|||||||
import { S3Client } from "@aws-sdk/client-s3";
|
import { S3Client, CreateBucketCommand, HeadBucketCommand, S3ServiceException } from "@aws-sdk/client-s3";
|
||||||
import { Upload } from "@aws-sdk/lib-storage";
|
import { Upload } from "@aws-sdk/lib-storage";
|
||||||
import { createWriteStream } from "node:fs";
|
import { createWriteStream } from "node:fs";
|
||||||
import { Readable } from "node:stream";
|
import { Readable } from "node:stream";
|
||||||
import { DataIngressPisData } from "@owlboard/backend-data-contracts";
|
import { DataIngressPisData } from "@owlboard/backend-data-contracts";
|
||||||
import { log } from "./logger.js";
|
import { log } from "./logger.js";
|
||||||
|
import type { S3 } from "./config.js";
|
||||||
|
|
||||||
export async function processAndStore(
|
export async function processAndStore(
|
||||||
|
cfg: S3,
|
||||||
generator: AsyncGenerator<DataIngressPisData.PisObjects>,
|
generator: AsyncGenerator<DataIngressPisData.PisObjects>,
|
||||||
filename: string
|
filename: string
|
||||||
) {
|
) {
|
||||||
@@ -15,29 +17,45 @@ export async function processAndStore(
|
|||||||
}
|
}
|
||||||
})());
|
})());
|
||||||
|
|
||||||
const useS3 = process.env.S3_ENDPOINT && process.env.S3_BUCKET;
|
const useS3 = cfg.S3_ENDPOINT && cfg.S3_BUCKET;
|
||||||
|
|
||||||
if (useS3) {
|
if (useS3) {
|
||||||
if (!process.env.S3_ENDPOINT || process.env.S3_BUCKET || !process.env.S3_REGION || !process.env.S3_ACCESS_KEY || !process.env.S3_SECRET_KEY) {
|
if (!cfg.S3_ENDPOINT || !cfg.S3_BUCKET || !cfg.S3_REGION || !cfg.S3_ACCESS_KEY || !cfg.S3_SECRET_KEY) {
|
||||||
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
|
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
|
||||||
process.exit(24);
|
process.exit(24);
|
||||||
}
|
}
|
||||||
log('INFO', `Streaming to S3: ${process.env.S3_BUCKET}/${filename}`);
|
|
||||||
|
log("DEBUG", "Opening connection to S3 Server");
|
||||||
|
|
||||||
const client = new S3Client({
|
const client = new S3Client({
|
||||||
endpoint: process.env.S3_ENDPOINT,
|
endpoint: cfg.S3_ENDPOINT,
|
||||||
region: process.env.S3_REGION,
|
region: cfg.S3_REGION,
|
||||||
credentials: {
|
credentials: {
|
||||||
accessKeyId: process.env.S3_ACCESS_KEY!,
|
accessKeyId: cfg.S3_ACCESS_KEY!,
|
||||||
secretAccessKey: process.env.S3_SECRET_KEY,
|
secretAccessKey: cfg.S3_SECRET_KEY,
|
||||||
},
|
},
|
||||||
forcePathStyle: true,
|
forcePathStyle: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Check bucket exists, and create if needed
|
||||||
|
try {
|
||||||
|
await client.send(new HeadBucketCommand({ Bucket: cfg.S3_BUCKET }));
|
||||||
|
} catch (err) {
|
||||||
|
if (err instanceof S3ServiceException && err.$metadata.httpStatusCode === 404) {
|
||||||
|
log('INFO', `Bucket ${cfg.S3_BUCKET} does not exist, creating...`);
|
||||||
|
await client.send(new CreateBucketCommand({ Bucket: cfg.S3_BUCKET }));
|
||||||
|
} else {
|
||||||
|
log(`ERROR`, `Failed to verify bucket: ${cfg.S3_BUCKET}`);
|
||||||
|
process.exit(21);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
|
||||||
|
|
||||||
const upload = new Upload({
|
const upload = new Upload({
|
||||||
client,
|
client,
|
||||||
params: {
|
params: {
|
||||||
Bucket: process.env.S3_BUCKET,
|
Bucket: cfg.S3_BUCKET,
|
||||||
Key: filename,
|
Key: filename,
|
||||||
Body: ndjsonStream,
|
Body: ndjsonStream,
|
||||||
ContentType: "application/x-ndjson",
|
ContentType: "application/x-ndjson",
|
||||||
|
|||||||
Reference in New Issue
Block a user