Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0c934a748f | |||
| 33bd1cd320 | |||
| e115edd79e | |||
| 1f07951b88 | |||
| b0a45f4000 | |||
| b42e37c569 | |||
| 2cb9c320cf |
@@ -20,7 +20,7 @@ WORKDIR /app
|
|||||||
COPY package*.json ./
|
COPY package*.json ./
|
||||||
RUN npm ci --omit=dev
|
RUN npm ci --omit=dev
|
||||||
|
|
||||||
COPY --from=builder /app/dist ./dist
|
COPY --from=builder /app/dist /app/dist
|
||||||
|
|
||||||
USER node
|
USER node
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { readFileSync, existsSync } from "node:fs"
|
import { readFileSync, existsSync } from "node:fs"
|
||||||
import { join } from "node:path"
|
import { join } from "node:path"
|
||||||
import { log } from "./logger";
|
import { log } from "./logger.js";
|
||||||
|
|
||||||
interface Config {
|
interface Config {
|
||||||
Mq: Mq,
|
Mq: Mq,
|
||||||
|
|||||||
@@ -1,62 +0,0 @@
|
|||||||
// Functions to check the current applied version
|
|
||||||
// updating will be handled by the processing service
|
|
||||||
|
|
||||||
import { MongoClient } from "mongodb";
|
|
||||||
import { log } from "./logger";
|
|
||||||
import type { Mongo } from "./config";
|
|
||||||
|
|
||||||
const collection = "data_ingress_meta";
|
|
||||||
|
|
||||||
|
|
||||||
function uriBuild(cfg: Mongo): string {
|
|
||||||
if(!cfg.MONGO_URI || !cfg.MONGO_DB || !cfg.MONGO_USER || !cfg.MONGO_PASS) {
|
|
||||||
log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35");
|
|
||||||
process.exit(35);
|
|
||||||
} else {
|
|
||||||
log("DEBUG", `MongoDB Connection`, {
|
|
||||||
uri: cfg.MONGO_URI,
|
|
||||||
db: cfg.MONGO_DB,
|
|
||||||
collection: collection,
|
|
||||||
user: cfg.MONGO_USER,
|
|
||||||
pass: "****",
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
return `mongodb://${encodeURIComponent(cfg.MONGO_USER)}:${encodeURIComponent(cfg.MONGO_PASS)}@${cfg.MONGO_URI}`
|
|
||||||
}
|
|
||||||
|
|
||||||
let mongoClient: MongoClient | null = null;
|
|
||||||
|
|
||||||
async function getMongoClient(cfg: Mongo) {
|
|
||||||
if (mongoClient) return mongoClient;
|
|
||||||
|
|
||||||
mongoClient = new MongoClient(uriBuild(cfg));
|
|
||||||
await mongoClient.connect();
|
|
||||||
return mongoClient;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function isPackageProcessed(cfg: Mongo, serviceName: string, packageName: string): Promise<boolean> {
|
|
||||||
try {
|
|
||||||
const client = await getMongoClient(cfg);
|
|
||||||
const database = client.db(cfg.MONGO_DB);
|
|
||||||
const coll = database.collection(collection);
|
|
||||||
|
|
||||||
const result = await coll.findOne({ service_name: serviceName });
|
|
||||||
|
|
||||||
if (!result) {
|
|
||||||
log('INFO', `No metadata found for ${serviceName}. Fetching PIS Data`);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result.latest_package === packageName) {
|
|
||||||
log('INFO', 'No update needed');
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
log('INFO', `Version mismatch. DB: ${result.latest_package}, Current: ${packageName}. Update required`)
|
|
||||||
return false;
|
|
||||||
} catch (err) {
|
|
||||||
log('ERROR', 'Failed to check Mongo for version state:', err);
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
47
src/index.ts
47
src/index.ts
@@ -4,31 +4,55 @@ import { processAndStore } from './sss.js'
|
|||||||
|
|
||||||
import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
|
import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
|
||||||
import { processPisStream } from './process.js'
|
import { processPisStream } from './process.js'
|
||||||
import { isPackageProcessed } from './database.js'
|
|
||||||
import { ConfigLoader } from './config.js'
|
import { ConfigLoader } from './config.js'
|
||||||
import { sendFileUpdateMessage } from './nats.js'
|
import { natsManager } from './nats.js'
|
||||||
|
|
||||||
|
async function exit(exitCode: string | number=0): Promise<void> {
|
||||||
|
log("INFO", `Exiting with code: ${exitCode}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await natsManager.close();
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `Error during cleanup: ${err}`);
|
||||||
|
process.exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
process.exit(exitCode);
|
||||||
|
}
|
||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
const SERVICE_NAME = process.env.SERVICE_NAME;
|
const SERVICE_NAME = process.env.SERVICE_NAME;
|
||||||
if (!SERVICE_NAME) {
|
if (!SERVICE_NAME) {
|
||||||
log('ERROR', "SERVICE_NAME env variable must be set");
|
log('ERROR', "SERVICE_NAME env variable must be set");
|
||||||
process.exit(1);
|
process.exitCode = 1;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const CURRENT_VERSION_KEY: string = `${SERVICE_NAME}-current-version`;
|
||||||
|
|
||||||
const config = ConfigLoader();
|
const config = ConfigLoader();
|
||||||
|
|
||||||
|
try {
|
||||||
|
log('INFO', `Initialising NATS`);
|
||||||
|
await natsManager.connect(config.Mq);
|
||||||
|
} catch (err) {
|
||||||
|
log('ERROR', `Unable to connect to NATS: ${err}`);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const packageInfo = await getLatestPackageName();
|
const packageInfo = await getLatestPackageName();
|
||||||
log('INFO', `Latest PIS Package: ${packageInfo.name}`);
|
log('INFO', `Latest PIS Package: ${packageInfo.name}`);
|
||||||
|
|
||||||
if (!packageInfo.assets[0]?.browser_download_url) {
|
if (!packageInfo.assets[0]?.browser_download_url) {
|
||||||
log('ERROR', `No attachments found for release ${packageInfo.name}`);
|
log('ERROR', `No attachments found for release ${packageInfo.name}`);
|
||||||
process.exit(9);
|
process.exitCode = 1;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (await isPackageProcessed(config.Mongo, SERVICE_NAME, packageInfo.name)) {
|
const lastAppliedVersion: string | null = await natsManager.getState(CURRENT_VERSION_KEY);
|
||||||
log('INFO', `Database matches latest release. Exiting`);
|
if (lastAppliedVersion === packageInfo.name) {
|
||||||
process.exit(0);
|
log('INFO', "No new data, exiting");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
|
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
|
||||||
@@ -42,12 +66,13 @@ async function main() {
|
|||||||
log('DEBUG', 'Done');
|
log('DEBUG', 'Done');
|
||||||
|
|
||||||
log('DEBUG', "Sending message to NATS");
|
log('DEBUG', "Sending message to NATS");
|
||||||
sendFileUpdateMessage(config.Mq, filename, packageInfo.name, SERVICE_NAME);
|
await natsManager.sendFileUpdateMessage(filename, packageInfo.name, SERVICE_NAME);
|
||||||
log('DEBUG', "Done, exiting")
|
await natsManager.setState(CURRENT_VERSION_KEY, packageInfo.name);
|
||||||
return
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log('ERROR', 'Fatal error in pipeline: ', err);
|
log('ERROR', 'Fatal error in pipeline: ', err);
|
||||||
process.exit(7);
|
process.exitCode = 1;
|
||||||
|
} finally {
|
||||||
|
await exit(process.exitCode || 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
122
src/nats.ts
122
src/nats.ts
@@ -1,37 +1,108 @@
|
|||||||
import { connect, JSONCodec } from "nats";
|
import { connect, JSONCodec, StringCodec } from "nats";
|
||||||
import type { ConnectionOptions, NatsConnection, Payload } from "nats";
|
import type { ConnectionOptions, NatsConnection, JetStreamClient, KV } from "nats";
|
||||||
import { log } from "./logger";
|
import { log } from "./logger.js";
|
||||||
import { hostname } from "node:os";
|
import { hostname } from "node:os";
|
||||||
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
|
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
|
||||||
import type { Mq } from "./config";
|
import type { Mq } from "./config.js";
|
||||||
|
|
||||||
const jc = JSONCodec();
|
const jc = JSONCodec();
|
||||||
|
const sc = StringCodec();
|
||||||
|
|
||||||
|
class NatsManager {
|
||||||
|
private nc: NatsConnection | null = null;
|
||||||
|
private js: JetStreamClient | null = null;
|
||||||
|
private kv: KV | null = null;
|
||||||
|
private bucketName = "INGRESS_STATES";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Opens connection to NATS
|
||||||
|
*/
|
||||||
|
async connect(cfg: Mq): Promise<void> {
|
||||||
|
if (this.nc) return;
|
||||||
|
|
||||||
async function getNatsConnection(cfg: Mq): Promise<NatsConnection> {
|
|
||||||
const options: ConnectionOptions = {
|
const options: ConnectionOptions = {
|
||||||
servers: cfg.MQ_URL,
|
servers: cfg.MQ_URL,
|
||||||
name: hostname(),
|
name: hostname(),
|
||||||
reconnect: true,
|
reconnect: true,
|
||||||
maxReconnectAttempts: -1,
|
maxReconnectAttempts: -1,
|
||||||
|
waitOnFirstConnect: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (cfg.MQ_USER && cfg.MQ_PASS) {
|
if (cfg.MQ_USER && cfg.MQ_PASS) {
|
||||||
options.user = cfg.MQ_USER;
|
options.user = cfg.MQ_USER;
|
||||||
options.pass = cfg.MQ_PASS;
|
options.pass = cfg.MQ_PASS;
|
||||||
log("INFO", "NATS: Using username/password authentication");
|
log("INFO", "NATS: Using auth credentials");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.nc = await connect(options);
|
||||||
|
this.js = this.nc.jetstream();
|
||||||
|
log("INFO", `NATS: Connected to ${cfg.MQ_URL}`);
|
||||||
|
|
||||||
|
// Handle connection close events
|
||||||
|
this.nc.closed().then((err) => {
|
||||||
|
if (err) {
|
||||||
|
log("ERROR", `NATS: Connection closed: ${err}`);
|
||||||
} else {
|
} else {
|
||||||
log("INFO", "NATS: Connecting without authentication");
|
log("INFO", "NATS: Connection ended")
|
||||||
|
}
|
||||||
|
this.nc = null;
|
||||||
|
this.js = null;
|
||||||
|
this.kv = null;
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `NATS: Initial connection failed: ${err}`);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return await connect(options)
|
/**
|
||||||
|
* Accessor for the KV store
|
||||||
|
*/
|
||||||
|
private async getKV(): Promise<KV> {
|
||||||
|
if (!this.js) throw new Error("NATS: JetStream not initialized. Call connect() first.");
|
||||||
|
if (!this.kv) {
|
||||||
|
this.kv = await this.js.views.kv(this.bucketName);
|
||||||
|
}
|
||||||
|
return this.kv;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send Message Function here to send the message to NATS
|
/**
|
||||||
export async function sendFileUpdateMessage(cfg: Mq, path: string, version: string, serviceName: string): Promise<boolean> {
|
* Get the last recorded state/hash for a service
|
||||||
const serviceId: string = hostname();
|
*/
|
||||||
|
async getState(key: string): Promise<string | null> {
|
||||||
|
try {
|
||||||
|
const store = await this.getKV();
|
||||||
|
const entry = await store.get(key);
|
||||||
|
return entry ? sc.decode(entry.value) : null;
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `NATS: KV Get failed for ${key}: ${err}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the current state/hash for a service
|
||||||
|
*/
|
||||||
|
async setState(key: string, value: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
const store = await this.getKV();
|
||||||
|
await store.put(key, sc.encode(value));
|
||||||
|
} catch (err) {
|
||||||
|
log("ERROR", `NATS: KV Set failed for ${key}: ${err}`);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publishes message to the JetStream
|
||||||
|
*/
|
||||||
|
async sendFileUpdateMessage(path: string, version: string, serviceName: string): Promise<boolean> {
|
||||||
|
if (!this.js) throw new Error("NATS: JetStream not initialized");
|
||||||
|
|
||||||
const message: MQFileUpdate = {
|
const message: MQFileUpdate = {
|
||||||
service_name: serviceName,
|
service_name: serviceName,
|
||||||
service_id: serviceId,
|
service_id: hostname(),
|
||||||
sent_timestamp: Math.floor(Date.now() / 1000),
|
sent_timestamp: Math.floor(Date.now() / 1000),
|
||||||
data_type: "file",
|
data_type: "file",
|
||||||
data_kind: "pis",
|
data_kind: "pis",
|
||||||
@@ -41,20 +112,25 @@ export async function sendFileUpdateMessage(cfg: Mq, path: string, version: stri
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let nats: NatsConnection | undefined;
|
|
||||||
|
|
||||||
try {
|
|
||||||
const nats: NatsConnection = await getNatsConnection(cfg);
|
|
||||||
|
|
||||||
const subject = `ingress.file.${message.data_kind}`;
|
const subject = `ingress.file.${message.data_kind}`;
|
||||||
|
|
||||||
nats.publish(subject, jc.encode(message));
|
try {
|
||||||
|
await this.js.publish(subject, jc.encode(message));
|
||||||
await nats.drain();
|
log("INFO", `NATS: Message published to ${subject}`);
|
||||||
return true
|
return true;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log("ERROR", `NATS: Failed to send message: ${err}`);
|
log("ERROR", `NATS: Failed to publish to JetStream: ${err}`);
|
||||||
if (nats) {nats.close()}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async close() {
|
||||||
|
if (this.nc) {
|
||||||
|
await this.nc.drain();
|
||||||
|
this.nc = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Export instance
|
||||||
|
export const natsManager = new NatsManager();
|
||||||
@@ -36,7 +36,7 @@ export async function* processPisStream(cfg: GeneralConfig, inputStream: Readabl
|
|||||||
if (!line.trim()) continue;
|
if (!line.trim()) continue;
|
||||||
|
|
||||||
const record = JSON.parse(line) as InputRecord;
|
const record = JSON.parse(line) as InputRecord;
|
||||||
log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
|
// log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
|
||||||
|
|
||||||
const crsHash = XXH.h64(record.stops.join('|'), SEED);
|
const crsHash = XXH.h64(record.stops.join('|'), SEED);
|
||||||
const tiplocStops = await mapStopsToTiploc(record.stops);
|
const tiplocStops = await mapStopsToTiploc(record.stops);
|
||||||
|
|||||||
20
src/sss.ts
20
src/sss.ts
@@ -1,4 +1,4 @@
|
|||||||
import { S3Client } from "@aws-sdk/client-s3";
|
import { S3Client, CreateBucketCommand, HeadBucketCommand, S3ServiceException } from "@aws-sdk/client-s3";
|
||||||
import { Upload } from "@aws-sdk/lib-storage";
|
import { Upload } from "@aws-sdk/lib-storage";
|
||||||
import { createWriteStream } from "node:fs";
|
import { createWriteStream } from "node:fs";
|
||||||
import { Readable } from "node:stream";
|
import { Readable } from "node:stream";
|
||||||
@@ -24,7 +24,8 @@ export async function processAndStore(
|
|||||||
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
|
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
|
||||||
process.exit(24);
|
process.exit(24);
|
||||||
}
|
}
|
||||||
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
|
|
||||||
|
log("DEBUG", "Opening connection to S3 Server");
|
||||||
|
|
||||||
const client = new S3Client({
|
const client = new S3Client({
|
||||||
endpoint: cfg.S3_ENDPOINT,
|
endpoint: cfg.S3_ENDPOINT,
|
||||||
@@ -36,6 +37,21 @@ export async function processAndStore(
|
|||||||
forcePathStyle: true,
|
forcePathStyle: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Check bucket exists, and create if needed
|
||||||
|
try {
|
||||||
|
await client.send(new HeadBucketCommand({ Bucket: cfg.S3_BUCKET }));
|
||||||
|
} catch (err) {
|
||||||
|
if (err instanceof S3ServiceException && err.$metadata.httpStatusCode === 404) {
|
||||||
|
log('INFO', `Bucket ${cfg.S3_BUCKET} does not exist, creating...`);
|
||||||
|
await client.send(new CreateBucketCommand({ Bucket: cfg.S3_BUCKET }));
|
||||||
|
} else {
|
||||||
|
log(`ERROR`, `Failed to verify bucket: ${cfg.S3_BUCKET}`);
|
||||||
|
process.exit(21);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
|
||||||
|
|
||||||
const upload = new Upload({
|
const upload = new Upload({
|
||||||
client,
|
client,
|
||||||
params: {
|
params: {
|
||||||
|
|||||||
Reference in New Issue
Block a user