From 33bd1cd320e8d05b53aa3d20f714afb93e8f0e62 Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Wed, 18 Feb 2026 21:54:05 +0000 Subject: [PATCH] Update NATS connection to handle using a KV store to track last update --- src/index.ts | 47 +++++++++++---- src/nats.ts | 160 +++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 153 insertions(+), 54 deletions(-) diff --git a/src/index.ts b/src/index.ts index 008c636..b774525 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,29 +5,55 @@ import { processAndStore } from './sss.js' import { getLatestPackageName, getRequestStream } from './sources/gitea.js' import { processPisStream } from './process.js' import { ConfigLoader } from './config.js' -import { sendFileUpdateMessage } from './nats.js' +import { natsManager } from './nats.js' + +async function exit(exitCode: string | number=0): Promise { + log("INFO", `Exiting with code: ${exitCode}`); + + try { + await natsManager.close(); + } catch (err) { + log("ERROR", `Error during cleanup: ${err}`); + process.exit(1) + } + + process.exit(exitCode); +} async function main() { const SERVICE_NAME = process.env.SERVICE_NAME; if (!SERVICE_NAME) { log('ERROR', "SERVICE_NAME env variable must be set"); - process.exit(1); + process.exitCode = 1; + return; } + const CURRENT_VERSION_KEY: string = `${SERVICE_NAME}-current-version`; + const config = ConfigLoader(); + try { + log('INFO', `Initialising NATS`); + await natsManager.connect(config.Mq); + } catch (err) { + log('ERROR', `Unable to connect to NATS: ${err}`); + } + try { const packageInfo = await getLatestPackageName(); log('INFO', `Latest PIS Package: ${packageInfo.name}`); if (!packageInfo.assets[0]?.browser_download_url) { log('ERROR', `No attachments found for release ${packageInfo.name}`); - process.exit(9); + process.exitCode = 1; + return; } - // Check if package already processed HERE... - log('WARN', 'Version check disabled, downloading data from source'); - // exit(0) if done, else continue + const lastAppliedVersion: string | null = await natsManager.getState(CURRENT_VERSION_KEY); + if (lastAppliedVersion === packageInfo.name) { + log('INFO', "No new data, exiting"); + return; + } const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url); const objectGenerator = processPisStream(config.General, inputStream); @@ -40,12 +66,13 @@ async function main() { log('DEBUG', 'Done'); log('DEBUG', "Sending message to NATS"); - sendFileUpdateMessage(config.Mq, filename, packageInfo.name, SERVICE_NAME); - log('DEBUG', "Done, exiting") - return + await natsManager.sendFileUpdateMessage(filename, packageInfo.name, SERVICE_NAME); + await natsManager.setState(CURRENT_VERSION_KEY, packageInfo.name); } catch (err) { log('ERROR', 'Fatal error in pipeline: ', err); - process.exit(7); + process.exitCode = 1; + } finally { + await exit(process.exitCode || 0); } } diff --git a/src/nats.ts b/src/nats.ts index 58f8755..84412e1 100644 --- a/src/nats.ts +++ b/src/nats.ts @@ -1,60 +1,132 @@ -import { connect, JSONCodec } from "nats"; -import type { ConnectionOptions, NatsConnection, Payload } from "nats"; +import { connect, JSONCodec, StringCodec } from "nats"; +import type { ConnectionOptions, NatsConnection, JetStreamClient, KV } from "nats"; import { log } from "./logger.js"; import { hostname } from "node:os"; import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update"; import type { Mq } from "./config.js"; const jc = JSONCodec(); +const sc = StringCodec(); -async function getNatsConnection(cfg: Mq): Promise { - const options: ConnectionOptions = { - servers: cfg.MQ_URL, - name: hostname(), - reconnect: true, - maxReconnectAttempts: -1, - }; +class NatsManager { + private nc: NatsConnection | null = null; + private js: JetStreamClient | null = null; + private kv: KV | null = null; + private bucketName = "INGRESS_STATES"; - if (cfg.MQ_USER && cfg.MQ_PASS) { - options.user = cfg.MQ_USER; - options.pass = cfg.MQ_PASS; - log("INFO", "NATS: Using username/password authentication"); - } else { - log("INFO", "NATS: Connecting without authentication"); + /** + * Opens connection to NATS + */ + async connect(cfg: Mq): Promise { + if (this.nc) return; + + const options: ConnectionOptions = { + servers: cfg.MQ_URL, + name: hostname(), + reconnect: true, + maxReconnectAttempts: -1, + waitOnFirstConnect: true, + }; + + if (cfg.MQ_USER && cfg.MQ_PASS) { + options.user = cfg.MQ_USER; + options.pass = cfg.MQ_PASS; + log("INFO", "NATS: Using auth credentials"); + } + + try { + this.nc = await connect(options); + this.js = this.nc.jetstream(); + log("INFO", `NATS: Connected to ${cfg.MQ_URL}`); + + // Handle connection close events + this.nc.closed().then((err) => { + log("ERROR", `NATS: Connection closed: ${err}`); + this.nc = null; + this.js = null; + this.kv = null; + }); + } catch (err) { + log("ERROR", `NATS: Initial connection failed: ${err}`); + throw err; + } } - return await connect(options) -} - -// Send Message Function here to send the message to NATS -export async function sendFileUpdateMessage(cfg: Mq, path: string, version: string, serviceName: string): Promise { - const serviceId: string = hostname(); - const message: MQFileUpdate = { - service_name: serviceName, - service_id: serviceId, - sent_timestamp: Math.floor(Date.now() / 1000), - data_type: "file", - data_kind: "pis", - payload: { - version: version, - filepath: path, + /** + * Accessor for the KV store + */ + private async getKV(): Promise { + if (!this.js) throw new Error("NATS: JetStream not initialized. Call connect() first."); + if (!this.kv) { + this.kv = await this.js.views.kv(this.bucketName); } - }; - - let nats: NatsConnection | undefined; + return this.kv; + } - try { - const nats: NatsConnection = await getNatsConnection(cfg); + /** + * Get the last recorded state/hash for a service + */ + async getState(key: string): Promise { + try { + const store = await this.getKV(); + const entry = await store.get(key); + return entry ? sc.decode(entry.value) : null; + } catch (err) { + log("ERROR", `NATS: KV Get failed for ${key}: ${err}`); + return null; + } + } + + /** + * Set the current state/hash for a service + */ + async setState(key: string, value: string): Promise { + try { + const store = await this.getKV(); + await store.put(key, sc.encode(value)); + } catch (err) { + log("ERROR", `NATS: KV Set failed for ${key}: ${err}`); + throw err; + } + } + + /** + * Publishes message to the JetStream + */ + async sendFileUpdateMessage(path: string, version: string, serviceName: string): Promise { + if (!this.js) throw new Error("NATS: JetStream not initialized"); + + const message: MQFileUpdate = { + service_name: serviceName, + service_id: hostname(), + sent_timestamp: Math.floor(Date.now() / 1000), + data_type: "file", + data_kind: "pis", + payload: { + version: version, + filepath: path, + } + }; const subject = `ingress.file.${message.data_kind}`; - nats.publish(subject, jc.encode(message)); - - await nats.drain(); - return true - } catch (err) { - log("ERROR", `NATS: Failed to send message: ${err}`); - if (nats) {nats.close()} - return false; + try { + await this.js.publish(subject, jc.encode(message)); + log("INFO", `NATS: Message published to ${subject}`); + return true; + } catch (err) { + log("ERROR", `NATS: Failed to publish to JetStream: ${err}`); + return false; + } } -} \ No newline at end of file + + async close() { + if (this.nc) { + await this.nc.drain(); + this.nc = null; + } + } +} + +// Export instance +export const natsManager = new NatsManager(); \ No newline at end of file