Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b42e37c569 | |||
| 2cb9c320cf | |||
| c90163cdce |
@@ -20,7 +20,7 @@ WORKDIR /app
|
||||
COPY package*.json ./
|
||||
RUN npm ci --omit=dev
|
||||
|
||||
COPY --from=builder /app/dist ./dist
|
||||
COPY --from=builder /app/dist /app/dist
|
||||
|
||||
USER node
|
||||
|
||||
|
||||
79
src/config.ts
Normal file
79
src/config.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { readFileSync, existsSync } from "node:fs"
|
||||
import { join } from "node:path"
|
||||
import { log } from "./logger.js";
|
||||
|
||||
interface Config {
|
||||
Mq: Mq,
|
||||
S3: S3,
|
||||
Mongo: Mongo,
|
||||
General: GeneralConfig,
|
||||
}
|
||||
|
||||
export interface Mq {
|
||||
MQ_USER: string;
|
||||
MQ_PASS: string;
|
||||
MQ_URL: string;
|
||||
MQ_TOPIC: string;
|
||||
}
|
||||
|
||||
export interface S3 {
|
||||
S3_ENDPOINT: string;
|
||||
S3_BUCKET: string;
|
||||
S3_ACCESS_KEY: string;
|
||||
S3_SECRET_KEY: string;
|
||||
S3_REGION: string;
|
||||
}
|
||||
|
||||
export interface Mongo {
|
||||
MONGO_URI: string;
|
||||
MONGO_DB: string;
|
||||
MONGO_USER: string;
|
||||
MONGO_PASS: string;
|
||||
}
|
||||
|
||||
export interface GeneralConfig {
|
||||
TOC: string;
|
||||
}
|
||||
|
||||
export function ConfigLoader(): Config {
|
||||
const cfg: Config = {
|
||||
Mq: {
|
||||
MQ_URL: getConfig("MQ_URL"),
|
||||
MQ_TOPIC: getConfig("MQ_TOPIC"),
|
||||
MQ_USER: getConfig("MQ_USER"),
|
||||
MQ_PASS: getConfig("MQ_PASS"),
|
||||
},
|
||||
S3: {
|
||||
S3_ENDPOINT: getConfig("S3_ENDPOINT"),
|
||||
S3_BUCKET: getConfig("S3_BUCKET"),
|
||||
S3_ACCESS_KEY: getConfig("S3_ACCESS_KEY"),
|
||||
S3_SECRET_KEY: getConfig("S3_SECRET_KEY"),
|
||||
S3_REGION: getConfig("S3_REGION"),
|
||||
},
|
||||
Mongo: {
|
||||
MONGO_URI: getConfig("MONGO_URI"),
|
||||
MONGO_DB: getConfig("MONGO_DB"),
|
||||
MONGO_USER: getConfig("MONGO_USER"),
|
||||
MONGO_PASS: getConfig("MONGO_PASS"),
|
||||
},
|
||||
General: {
|
||||
TOC: getConfig("TOC"),
|
||||
}
|
||||
};
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
||||
function getConfig(key: string): string {
|
||||
const filePath = join("/etc/secrets", key);
|
||||
if (existsSync(filePath)) {
|
||||
try {
|
||||
return readFileSync(filePath, "utf8").trim();
|
||||
} catch (err) {
|
||||
log("ERROR", `Error reading secret file at ${filePath}: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
return process.env[key] || ""
|
||||
}
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
// Functions to check the current applied version
|
||||
// updating will be handled by the processing service
|
||||
|
||||
import { MongoClient } from "mongodb";
|
||||
import { log } from "./logger";
|
||||
|
||||
const uri = process.env.MONGO_URI || "";
|
||||
const db = process.env.MONGO_DB || "";
|
||||
const user = process.env.MONGO_USER || "";
|
||||
const pass = process.env.MONGO_PASS || "";
|
||||
const collection = "data_ingress_meta";
|
||||
|
||||
if(!uri || !db || !user || !pass) {
|
||||
log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35");
|
||||
process.exit(35);
|
||||
} else {
|
||||
log("DEBUG", `MongoDB Connection`, {
|
||||
uri: uri,
|
||||
db: db,
|
||||
collection: collection,
|
||||
user: user,
|
||||
pass: "****",
|
||||
});
|
||||
};
|
||||
|
||||
const CONNECTION_URI = `mongodb://${encodeURIComponent(user)}:${encodeURIComponent(pass)}@${uri}`;
|
||||
|
||||
let mongoClient: MongoClient | null = null;
|
||||
|
||||
async function getMongoClient() {
|
||||
if (mongoClient) return mongoClient;
|
||||
|
||||
mongoClient = new MongoClient(CONNECTION_URI);
|
||||
await mongoClient.connect();
|
||||
return mongoClient;
|
||||
}
|
||||
|
||||
export async function isPackageProcessed(serviceName: string, packageName: string): Promise<boolean> {
|
||||
try {
|
||||
const client = await getMongoClient();
|
||||
const database = client.db(db);
|
||||
const coll = database.collection(collection);
|
||||
|
||||
const result = await coll.findOne({ service_name: serviceName });
|
||||
|
||||
if (!result) {
|
||||
log('INFO', `No metadata found for ${serviceName}. Fetching PIS Data`);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (result.latest_package === packageName) {
|
||||
log('INFO', 'No update needed');
|
||||
return true;
|
||||
}
|
||||
|
||||
log('INFO', `Version mismatch. DB: ${result.latest_package}, Current: ${packageName}. Update required`)
|
||||
return false;
|
||||
} catch (err) {
|
||||
log('ERROR', 'Failed to check Mongo for version state:', err);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
20
src/index.ts
20
src/index.ts
@@ -4,7 +4,8 @@ import { processAndStore } from './sss.js'
|
||||
|
||||
import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
|
||||
import { processPisStream } from './process.js'
|
||||
import { isPackageProcessed } from './database.js'
|
||||
import { ConfigLoader } from './config.js'
|
||||
import { sendFileUpdateMessage } from './nats.js'
|
||||
|
||||
async function main() {
|
||||
const SERVICE_NAME = process.env.SERVICE_NAME;
|
||||
@@ -13,6 +14,8 @@ async function main() {
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const config = ConfigLoader();
|
||||
|
||||
try {
|
||||
const packageInfo = await getLatestPackageName();
|
||||
log('INFO', `Latest PIS Package: ${packageInfo.name}`);
|
||||
@@ -22,20 +25,23 @@ async function main() {
|
||||
process.exit(9);
|
||||
}
|
||||
|
||||
if (await isPackageProcessed(SERVICE_NAME, packageInfo.name)) {
|
||||
log('INFO', `Database matches latest release. Exiting`);
|
||||
process.exit(0);
|
||||
}
|
||||
// Check if package already processed HERE...
|
||||
// exit(0) if done, else continue
|
||||
|
||||
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
|
||||
const objectGenerator = processPisStream(inputStream);
|
||||
const objectGenerator = processPisStream(config.General, inputStream);
|
||||
|
||||
const filename = `${packageInfo.name.replace(/\s+/g, '_')}_pis_data_ndjson`;
|
||||
|
||||
log('DEBUG', `Processing stream to: ${filename}`);
|
||||
|
||||
await processAndStore(objectGenerator, filename);
|
||||
await processAndStore(config.S3, objectGenerator, filename);
|
||||
log('DEBUG', 'Done');
|
||||
|
||||
log('DEBUG', "Sending message to NATS");
|
||||
sendFileUpdateMessage(config.Mq, filename, packageInfo.name, SERVICE_NAME);
|
||||
log('DEBUG', "Done, exiting")
|
||||
return
|
||||
} catch (err) {
|
||||
log('ERROR', 'Fatal error in pipeline: ', err);
|
||||
process.exit(7);
|
||||
|
||||
22
src/nats.ts
22
src/nats.ts
@@ -1,24 +1,23 @@
|
||||
import { connect, JSONCodec } from "nats";
|
||||
import type { ConnectionOptions, NatsConnection, Payload } from "nats";
|
||||
import { log } from "./logger";
|
||||
import { log } from "./logger.js";
|
||||
import { hostname } from "node:os";
|
||||
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
|
||||
import type { Mq } from "./config.js";
|
||||
|
||||
const jc = JSONCodec();
|
||||
|
||||
async function getNatsConnection(): Promise<NatsConnection> {
|
||||
const serverUrl = process.env.MQ_URL || "nats://localhost:4222";
|
||||
|
||||
async function getNatsConnection(cfg: Mq): Promise<NatsConnection> {
|
||||
const options: ConnectionOptions = {
|
||||
servers: serverUrl,
|
||||
servers: cfg.MQ_URL,
|
||||
name: hostname(),
|
||||
reconnect: true,
|
||||
maxReconnectAttempts: -1,
|
||||
};
|
||||
|
||||
if (process.env.MQ_USER && process.env.MQ_PASS) {
|
||||
options.user = process.env.MQ_USER;
|
||||
options.pass = process.env.MQ_PASS;
|
||||
if (cfg.MQ_USER && cfg.MQ_PASS) {
|
||||
options.user = cfg.MQ_USER;
|
||||
options.pass = cfg.MQ_PASS;
|
||||
log("INFO", "NATS: Using username/password authentication");
|
||||
} else {
|
||||
log("INFO", "NATS: Connecting without authentication");
|
||||
@@ -28,11 +27,10 @@ async function getNatsConnection(): Promise<NatsConnection> {
|
||||
}
|
||||
|
||||
// Send Message Function here to send the message to NATS
|
||||
export async function sendFileUpdateMessage(path: string, version: string): Promise<boolean> {
|
||||
const serviceName: string = "pis-data-ingress";
|
||||
export async function sendFileUpdateMessage(cfg: Mq, path: string, version: string, serviceName: string): Promise<boolean> {
|
||||
const serviceId: string = hostname();
|
||||
const message: MQFileUpdate = {
|
||||
service_name: "pis-data-ingress",
|
||||
service_name: serviceName,
|
||||
service_id: serviceId,
|
||||
sent_timestamp: Math.floor(Date.now() / 1000),
|
||||
data_type: "file",
|
||||
@@ -46,7 +44,7 @@ export async function sendFileUpdateMessage(path: string, version: string): Prom
|
||||
let nats: NatsConnection | undefined;
|
||||
|
||||
try {
|
||||
const nats: NatsConnection = await getNatsConnection();
|
||||
const nats: NatsConnection = await getNatsConnection(cfg);
|
||||
|
||||
const subject = `ingress.file.${message.data_kind}`;
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import { createInterface } from 'node:readline';
|
||||
import XXH from 'xxhashjs';
|
||||
import { log } from './logger.js';
|
||||
import { DataIngressPisData } from '@owlboard/backend-data-contracts';
|
||||
import type { GeneralConfig } from './config.js';
|
||||
|
||||
const BASE_URL = process.env.BASEURL || 'https://owlboard.info'
|
||||
|
||||
@@ -18,8 +19,8 @@ interface InputRecord {
|
||||
stops: string[];
|
||||
}
|
||||
|
||||
export async function* processPisStream(inputStream: Readable) {
|
||||
const TOC = process.env.TOC;
|
||||
export async function* processPisStream(cfg: GeneralConfig, inputStream: Readable) {
|
||||
const TOC = cfg.TOC;
|
||||
if (!TOC) {
|
||||
log('ERROR', "TOC not set: Exit code 19");
|
||||
process.exit(19);
|
||||
|
||||
18
src/sss.ts
18
src/sss.ts
@@ -4,8 +4,10 @@ import { createWriteStream } from "node:fs";
|
||||
import { Readable } from "node:stream";
|
||||
import { DataIngressPisData } from "@owlboard/backend-data-contracts";
|
||||
import { log } from "./logger.js";
|
||||
import type { S3 } from "./config.js";
|
||||
|
||||
export async function processAndStore(
|
||||
cfg: S3,
|
||||
generator: AsyncGenerator<DataIngressPisData.PisObjects>,
|
||||
filename: string
|
||||
) {
|
||||
@@ -15,21 +17,21 @@ export async function processAndStore(
|
||||
}
|
||||
})());
|
||||
|
||||
const useS3 = process.env.S3_ENDPOINT && process.env.S3_BUCKET;
|
||||
const useS3 = cfg.S3_ENDPOINT && cfg.S3_BUCKET;
|
||||
|
||||
if (useS3) {
|
||||
if (!process.env.S3_ENDPOINT || process.env.S3_BUCKET || !process.env.S3_REGION || !process.env.S3_ACCESS_KEY || !process.env.S3_SECRET_KEY) {
|
||||
if (!cfg.S3_ENDPOINT || !cfg.S3_BUCKET || !cfg.S3_REGION || !cfg.S3_ACCESS_KEY || !cfg.S3_SECRET_KEY) {
|
||||
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
|
||||
process.exit(24);
|
||||
}
|
||||
log('INFO', `Streaming to S3: ${process.env.S3_BUCKET}/${filename}`);
|
||||
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
|
||||
|
||||
const client = new S3Client({
|
||||
endpoint: process.env.S3_ENDPOINT,
|
||||
region: process.env.S3_REGION,
|
||||
endpoint: cfg.S3_ENDPOINT,
|
||||
region: cfg.S3_REGION,
|
||||
credentials: {
|
||||
accessKeyId: process.env.S3_ACCESS_KEY!,
|
||||
secretAccessKey: process.env.S3_SECRET_KEY,
|
||||
accessKeyId: cfg.S3_ACCESS_KEY!,
|
||||
secretAccessKey: cfg.S3_SECRET_KEY,
|
||||
},
|
||||
forcePathStyle: true,
|
||||
});
|
||||
@@ -37,7 +39,7 @@ export async function processAndStore(
|
||||
const upload = new Upload({
|
||||
client,
|
||||
params: {
|
||||
Bucket: process.env.S3_BUCKET,
|
||||
Bucket: cfg.S3_BUCKET,
|
||||
Key: filename,
|
||||
Body: ndjsonStream,
|
||||
ContentType: "application/x-ndjson",
|
||||
|
||||
Reference in New Issue
Block a user