Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e115edd79e | |||
| 1f07951b88 | |||
| b0a45f4000 | |||
| b42e37c569 | |||
| 2cb9c320cf |
@@ -20,7 +20,7 @@ WORKDIR /app
|
||||
COPY package*.json ./
|
||||
RUN npm ci --omit=dev
|
||||
|
||||
COPY --from=builder /app/dist ./dist
|
||||
COPY --from=builder /app/dist /app/dist
|
||||
|
||||
USER node
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { readFileSync, existsSync } from "node:fs"
|
||||
import { join } from "node:path"
|
||||
import { log } from "./logger";
|
||||
import { log } from "./logger.js";
|
||||
|
||||
interface Config {
|
||||
Mq: Mq,
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
// Functions to check the current applied version
|
||||
// updating will be handled by the processing service
|
||||
|
||||
import { MongoClient } from "mongodb";
|
||||
import { log } from "./logger";
|
||||
import type { Mongo } from "./config";
|
||||
|
||||
const collection = "data_ingress_meta";
|
||||
|
||||
|
||||
function uriBuild(cfg: Mongo): string {
|
||||
if(!cfg.MONGO_URI || !cfg.MONGO_DB || !cfg.MONGO_USER || !cfg.MONGO_PASS) {
|
||||
log('ERROR', "Missing MONGO Configuration - EXIT CODE: 35");
|
||||
process.exit(35);
|
||||
} else {
|
||||
log("DEBUG", `MongoDB Connection`, {
|
||||
uri: cfg.MONGO_URI,
|
||||
db: cfg.MONGO_DB,
|
||||
collection: collection,
|
||||
user: cfg.MONGO_USER,
|
||||
pass: "****",
|
||||
});
|
||||
};
|
||||
|
||||
return `mongodb://${encodeURIComponent(cfg.MONGO_USER)}:${encodeURIComponent(cfg.MONGO_PASS)}@${cfg.MONGO_URI}`
|
||||
}
|
||||
|
||||
let mongoClient: MongoClient | null = null;
|
||||
|
||||
async function getMongoClient(cfg: Mongo) {
|
||||
if (mongoClient) return mongoClient;
|
||||
|
||||
mongoClient = new MongoClient(uriBuild(cfg));
|
||||
await mongoClient.connect();
|
||||
return mongoClient;
|
||||
}
|
||||
|
||||
export async function isPackageProcessed(cfg: Mongo, serviceName: string, packageName: string): Promise<boolean> {
|
||||
try {
|
||||
const client = await getMongoClient(cfg);
|
||||
const database = client.db(cfg.MONGO_DB);
|
||||
const coll = database.collection(collection);
|
||||
|
||||
const result = await coll.findOne({ service_name: serviceName });
|
||||
|
||||
if (!result) {
|
||||
log('INFO', `No metadata found for ${serviceName}. Fetching PIS Data`);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (result.latest_package === packageName) {
|
||||
log('INFO', 'No update needed');
|
||||
return true;
|
||||
}
|
||||
|
||||
log('INFO', `Version mismatch. DB: ${result.latest_package}, Current: ${packageName}. Update required`)
|
||||
return false;
|
||||
} catch (err) {
|
||||
log('ERROR', 'Failed to check Mongo for version state:', err);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import { processAndStore } from './sss.js'
|
||||
|
||||
import { getLatestPackageName, getRequestStream } from './sources/gitea.js'
|
||||
import { processPisStream } from './process.js'
|
||||
import { isPackageProcessed } from './database.js'
|
||||
import { ConfigLoader } from './config.js'
|
||||
import { sendFileUpdateMessage } from './nats.js'
|
||||
|
||||
@@ -26,10 +25,9 @@ async function main() {
|
||||
process.exit(9);
|
||||
}
|
||||
|
||||
if (await isPackageProcessed(config.Mongo, SERVICE_NAME, packageInfo.name)) {
|
||||
log('INFO', `Database matches latest release. Exiting`);
|
||||
process.exit(0);
|
||||
}
|
||||
// Check if package already processed HERE...
|
||||
log('WARN', 'Version check disabled, downloading data from source');
|
||||
// exit(0) if done, else continue
|
||||
|
||||
const inputStream: Readable = await getRequestStream(packageInfo.assets[0].browser_download_url);
|
||||
const objectGenerator = processPisStream(config.General, inputStream);
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { connect, JSONCodec } from "nats";
|
||||
import type { ConnectionOptions, NatsConnection, Payload } from "nats";
|
||||
import { log } from "./logger";
|
||||
import { log } from "./logger.js";
|
||||
import { hostname } from "node:os";
|
||||
import type { MQFileUpdate } from "@owlboard/backend-data-contracts/dist/data-ingress_mq-file-update";
|
||||
import type { Mq } from "./config";
|
||||
import type { Mq } from "./config.js";
|
||||
|
||||
const jc = JSONCodec();
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ export async function* processPisStream(cfg: GeneralConfig, inputStream: Readabl
|
||||
if (!line.trim()) continue;
|
||||
|
||||
const record = JSON.parse(line) as InputRecord;
|
||||
log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
|
||||
// log('DEBUG', `JSON Line Parsed: ${JSON.stringify(record)}`)
|
||||
|
||||
const crsHash = XXH.h64(record.stops.join('|'), SEED);
|
||||
const tiplocStops = await mapStopsToTiploc(record.stops);
|
||||
|
||||
20
src/sss.ts
20
src/sss.ts
@@ -1,4 +1,4 @@
|
||||
import { S3Client } from "@aws-sdk/client-s3";
|
||||
import { S3Client, CreateBucketCommand, HeadBucketCommand, S3ServiceException } from "@aws-sdk/client-s3";
|
||||
import { Upload } from "@aws-sdk/lib-storage";
|
||||
import { createWriteStream } from "node:fs";
|
||||
import { Readable } from "node:stream";
|
||||
@@ -24,7 +24,8 @@ export async function processAndStore(
|
||||
log("DEBUG", "Missing required variables for S3 Support - EXIT CODE 24");
|
||||
process.exit(24);
|
||||
}
|
||||
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
|
||||
|
||||
log("DEBUG", "Opening connection to S3 Server");
|
||||
|
||||
const client = new S3Client({
|
||||
endpoint: cfg.S3_ENDPOINT,
|
||||
@@ -36,6 +37,21 @@ export async function processAndStore(
|
||||
forcePathStyle: true,
|
||||
});
|
||||
|
||||
// Check bucket exists, and create if needed
|
||||
try {
|
||||
await client.send(new HeadBucketCommand({ Bucket: cfg.S3_BUCKET }));
|
||||
} catch (err) {
|
||||
if (err instanceof S3ServiceException && err.$metadata.httpStatusCode === 404) {
|
||||
log('INFO', `Bucket ${cfg.S3_BUCKET} does not exist, creating...`);
|
||||
await client.send(new CreateBucketCommand({ Bucket: cfg.S3_BUCKET }));
|
||||
} else {
|
||||
log(`ERROR`, `Failed to verify bucket: ${cfg.S3_BUCKET}`);
|
||||
process.exit(21);
|
||||
}
|
||||
}
|
||||
|
||||
log('INFO', `Streaming to S3: ${cfg.S3_BUCKET}/${filename}`);
|
||||
|
||||
const upload = new Upload({
|
||||
client,
|
||||
params: {
|
||||
|
||||
Reference in New Issue
Block a user