2 Commits

2 changed files with 157 additions and 54 deletions

View File

@@ -5,29 +5,55 @@ import { processAndStore } from './sss.js'
import { getLatestPackageName, getRequestStream } from './sources/gitea.js' import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
import { processPisStream } from './process.js' import { processPisStream } from './process.js'
import { ConfigLoader } from './config.js' import { ConfigLoader } from './config.js'
import { sendFileUpdateMessage } from './nats.js' import { natsManager } from './nats.js'
async function exit(exitCode: string | number=0): Promise<void> {
log("INFO", `Exiting with code: ${exitCode}`);
try {
await natsManager.close();
} catch (err) {
log("ERROR", `Error during cleanup: ${err}`);
process.exit(1)
}
process.exit(exitCode);
}
async function main() { async function main() {
const SERVICE_NAME = process.env.SERVICE_NAME; const SERVICE_NAME = process.env.SERVICE_NAME;
if (!SERVICE_NAME) { if (!SERVICE_NAME) {
log('ERROR', "SERVICE_NAME env variable must be set"); log('ERROR', "SERVICE_NAME env variable must be set");
process.exit(1); process.exitCode = 1;
return;
} }
const CURRENT_VERSION_KEY: string = `${SERVICE_NAME}-current-version`;
const config = ConfigLoader(); const config = ConfigLoader();
try {
log('INFO', `Initialising NATS`);
await natsManager.connect(config.Mq);
} catch (err) {
log('ERROR', `Unable to connect to NATS: ${err}`);
}
try { try {
const packageInfo = await getLatestPackageName(); const packageInfo = await getLatestPackageName();
log('INFO', `Latest PIS Package: ${packageInfo.name}`); log('INFO', `Latest PIS Package: ${packageInfo.name}`);
if (!packageInfo.assets[0]?.browser_download_url) { if (!packageInfo.assets[0]?.browser_download_url) {
log('ERROR', `No attachments found for release ${packageInfo.name}`); log('ERROR', `No attachments found for release ${packageInfo.name}`);
process.exit(9); process.exitCode = 1;
return;
} }
// Check if package already processed HERE... const lastAppliedVersion: string | null = await natsManager.getState(CURRENT_VERSION_KEY);
log('WARN', 'Version check disabled, downloading data from source'); if (lastAppliedVersion === packageInfo.name) {
// exit(0) if done, else continue log('INFO', "No new data, exiting");
return;
}
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url); const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
const objectGenerator = processPisStream(config.General, inputStream); const objectGenerator = processPisStream(config.General, inputStream);
@@ -40,12 +66,13 @@ async function main() {
log('DEBUG', 'Done'); log('DEBUG', 'Done');
log('DEBUG', "Sending message to NATS"); log('DEBUG', "Sending message to NATS");
sendFileUpdateMessage(config.Mq, filename, packageInfo.name, SERVICE_NAME); await natsManager.sendFileUpdateMessage(filename, packageInfo.name, SERVICE_NAME);
log('DEBUG', "Done, exiting") await natsManager.setState(CURRENT_VERSION_KEY, packageInfo.name);
return
} catch (err) { } catch (err) {
log('ERROR', 'Fatal error in pipeline: ', err); log('ERROR', 'Fatal error in pipeline: ', err);
process.exit(7); process.exitCode = 1;
} finally {
await exit(process.exitCode || 0);
} }
} }

View File

@@ -1,37 +1,108 @@
import { connect, JSONCodec } from "nats"; import { connect, JSONCodec, StringCodec } from "nats";
import type { ConnectionOptions, NatsConnection, Payload } from "nats"; import type { ConnectionOptions, NatsConnection, JetStreamClient, KV } from "nats";
import { log } from "./logger.js"; import { log } from "./logger.js";
import { hostname } from "node:os"; import { hostname } from "node:os";
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update"; import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
import type { Mq } from "./config.js"; import type { Mq } from "./config.js";
const jc = JSONCodec(); const jc = JSONCodec();
const sc = StringCodec();
class NatsManager {
private nc: NatsConnection | null = null;
private js: JetStreamClient | null = null;
private kv: KV | null = null;
private bucketName = "INGRESS_STATES";
/**
* Opens connection to NATS
*/
async connect(cfg: Mq): Promise<void> {
if (this.nc) return;
async function getNatsConnection(cfg: Mq): Promise<NatsConnection> {
const options: ConnectionOptions = { const options: ConnectionOptions = {
servers: cfg.MQ_URL, servers: cfg.MQ_URL,
name: hostname(), name: hostname(),
reconnect: true, reconnect: true,
maxReconnectAttempts: -1, maxReconnectAttempts: -1,
waitOnFirstConnect: true,
}; };
if (cfg.MQ_USER && cfg.MQ_PASS) { if (cfg.MQ_USER && cfg.MQ_PASS) {
options.user = cfg.MQ_USER; options.user = cfg.MQ_USER;
options.pass = cfg.MQ_PASS; options.pass = cfg.MQ_PASS;
log("INFO", "NATS: Using username/password authentication"); log("INFO", "NATS: Using auth credentials");
} else {
log("INFO", "NATS: Connecting without authentication");
} }
return await connect(options) try {
} this.nc = await connect(options);
this.js = this.nc.jetstream();
log("INFO", `NATS: Connected to ${cfg.MQ_URL}`);
// Handle connection close events
this.nc.closed().then((err) => {
if (err) {
log("ERROR", `NATS: Connection closed: ${err}`);
} else {
log("INFO", "NATS: Connection ended")
}
this.nc = null;
this.js = null;
this.kv = null;
});
} catch (err) {
log("ERROR", `NATS: Initial connection failed: ${err}`);
throw err;
}
}
/**
* Accessor for the KV store
*/
private async getKV(): Promise<KV> {
if (!this.js) throw new Error("NATS: JetStream not initialized. Call connect() first.");
if (!this.kv) {
this.kv = await this.js.views.kv(this.bucketName);
}
return this.kv;
}
/**
* Get the last recorded state/hash for a service
*/
async getState(key: string): Promise<string | null> {
try {
const store = await this.getKV();
const entry = await store.get(key);
return entry ? sc.decode(entry.value) : null;
} catch (err) {
log("ERROR", `NATS: KV Get failed for ${key}: ${err}`);
return null;
}
}
/**
* Set the current state/hash for a service
*/
async setState(key: string, value: string): Promise<void> {
try {
const store = await this.getKV();
await store.put(key, sc.encode(value));
} catch (err) {
log("ERROR", `NATS: KV Set failed for ${key}: ${err}`);
throw err;
}
}
/**
* Publishes message to the JetStream
*/
async sendFileUpdateMessage(path: string, version: string, serviceName: string): Promise<boolean> {
if (!this.js) throw new Error("NATS: JetStream not initialized");
// Send Message Function here to send the message to NATS
export async function sendFileUpdateMessage(cfg: Mq, path: string, version: string, serviceName: string): Promise<boolean> {
const serviceId: string = hostname();
const message: MQFileUpdate = { const message: MQFileUpdate = {
service_name: serviceName, service_name: serviceName,
service_id: serviceId, service_id: hostname(),
sent_timestamp: Math.floor(Date.now() / 1000), sent_timestamp: Math.floor(Date.now() / 1000),
data_type: "file", data_type: "file",
data_kind: "pis", data_kind: "pis",
@@ -41,20 +112,25 @@ export async function sendFileUpdateMessage(cfg: Mq, path: string, version: stri
} }
}; };
let nats: NatsConnection | undefined;
try {
const nats: NatsConnection = await getNatsConnection(cfg);
const subject = `ingress.file.${message.data_kind}`; const subject = `ingress.file.${message.data_kind}`;
nats.publish(subject, jc.encode(message)); try {
await this.js.publish(subject, jc.encode(message));
await nats.drain(); log("INFO", `NATS: Message published to ${subject}`);
return true return true;
} catch (err) { } catch (err) {
log("ERROR", `NATS: Failed to send message: ${err}`); log("ERROR", `NATS: Failed to publish to JetStream: ${err}`);
if (nats) {nats.close()}
return false; return false;
} }
}
async close() {
if (this.nc) {
await this.nc.drain();
this.nc = null;
}
}
} }
// Export instance
export const natsManager = new NatsManager();