PIS Update based on hash rather than time
This commit is contained in:
parent
71dbf5df52
commit
7122233ac3
@ -4,6 +4,7 @@ Dockerfile
|
|||||||
.dockerignore
|
.dockerignore
|
||||||
.gitignore
|
.gitignore
|
||||||
LICENSE
|
LICENSE
|
||||||
|
dbman-log
|
||||||
|
|
||||||
# ---> Python
|
# ---> Python
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
|
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,5 +1,6 @@
|
|||||||
run.sh
|
run.sh
|
||||||
env
|
env
|
||||||
|
dbman-log
|
||||||
|
|
||||||
# ---> Python
|
# ---> Python
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
|
@ -9,10 +9,11 @@ import mongo
|
|||||||
CORPUS_URL = "https://publicdatafeeds.networkrail.co.uk/ntrod/SupportingFileAuthenticate?type=CORPUS"
|
CORPUS_URL = "https://publicdatafeeds.networkrail.co.uk/ntrod/SupportingFileAuthenticate?type=CORPUS"
|
||||||
|
|
||||||
#Fetch Configuration
|
#Fetch Configuration
|
||||||
log.out("corpus.py: Fetching CORPUS Configuration", "INFO")
|
|
||||||
CORPUS_USER = os.getenv('OWL_LDB_CORPUSUSER')
|
CORPUS_USER = os.getenv('OWL_LDB_CORPUSUSER')
|
||||||
CORPUS_PASS = os.getenv('OWL_LDB_CORPUSPASS')
|
CORPUS_PASS = os.getenv('OWL_LDB_CORPUSPASS')
|
||||||
|
|
||||||
|
log.out("corpus.py: CORPUS Module Loaded", "DBUG")
|
||||||
|
|
||||||
def fetch():
|
def fetch():
|
||||||
log.out("corpus.fetch: Fetching CORPUS Data from Network Rail", "INFO")
|
log.out("corpus.fetch: Fetching CORPUS Data from Network Rail", "INFO")
|
||||||
response = requests.get(CORPUS_URL, auth=(CORPUS_USER, CORPUS_PASS))
|
response = requests.get(CORPUS_URL, auth=(CORPUS_USER, CORPUS_PASS))
|
||||||
|
@ -15,6 +15,9 @@
|
|||||||
# https://git.fjla.uk/OwlBoard/db-manager/src/branch/main/LICENSE
|
# https://git.fjla.uk/OwlBoard/db-manager/src/branch/main/LICENSE
|
||||||
|
|
||||||
import smtplib, ssl, os
|
import smtplib, ssl, os
|
||||||
|
import logger as log
|
||||||
|
|
||||||
|
log.out("mailer.py: Mailer module loaded", "DBUG")
|
||||||
|
|
||||||
def submitLogs():
|
def submitLogs():
|
||||||
text :str = fetchLogs()
|
text :str = fetchLogs()
|
||||||
|
25
src/main.py
25
src/main.py
@ -14,21 +14,21 @@
|
|||||||
# program. If not, see
|
# program. If not, see
|
||||||
# https://git.fjla.uk/OwlBoard/db-manager/src/branch/main/LICENSE
|
# https://git.fjla.uk/OwlBoard/db-manager/src/branch/main/LICENSE
|
||||||
|
|
||||||
version = "2023.5.7"
|
version = "2023.5.9"
|
||||||
print(f"main.py: Initialising db-manager v{version}")
|
print(f"main.py: Initialising db-manager v{version}")
|
||||||
|
|
||||||
#Third Party Imports
|
#Third Party Imports
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
# Import logger
|
||||||
|
import logger as log
|
||||||
|
log.out(f"main.py: db-manager {version} Initialised", "INFO")
|
||||||
|
|
||||||
#Local Imports
|
#Local Imports
|
||||||
import corpus, mongo, pis, mailer, timetable
|
import corpus, mongo, pis, mailer, timetable
|
||||||
import logger as log
|
|
||||||
|
|
||||||
log.out("main.py: db-manager Initialised", "INFO")
|
# Ensure count document exists in meta, wrap in while look to prevent crashing if the DB is not ready:
|
||||||
|
|
||||||
|
|
||||||
#Ensure count document exists in meta, wrap in while look to prevent crashing if the DB is not ready:
|
|
||||||
dbReady = False
|
dbReady = False
|
||||||
while dbReady is False:
|
while dbReady is False:
|
||||||
try:
|
try:
|
||||||
@ -59,17 +59,8 @@ if corpusAge > 1036800:
|
|||||||
else:
|
else:
|
||||||
log.out('main.py: Not updating stations data until it is 1036800s old.', "INFO")
|
log.out('main.py: Not updating stations data until it is 1036800s old.', "INFO")
|
||||||
|
|
||||||
#Check & Update pis data:
|
## Run PIS Update
|
||||||
# If older than 2 days then update
|
pis.runUpdate()
|
||||||
pisAge = int(time.time()) - mongo.metaCheckTime("pis")
|
|
||||||
log.out(f'main.py: PIS Data is {pisAge}s old', "INFO")
|
|
||||||
if pisAge > 43200: # Temporarily set to 15 minutes
|
|
||||||
log.out('main.py: Updating PIS data', "INFO")
|
|
||||||
pisData = pis.load()
|
|
||||||
pisParsed = pis.parse(pisData)
|
|
||||||
mongo.putBulkPis(pisParsed)
|
|
||||||
else:
|
|
||||||
log.out('main.py: Not updating PIS data until is it 1036800s old', "INFO")
|
|
||||||
|
|
||||||
## Run Timetable Update
|
## Run Timetable Update
|
||||||
timetable.runUpdate()
|
timetable.runUpdate()
|
||||||
|
42
src/mongo.py
42
src/mongo.py
@ -4,17 +4,18 @@ import time
|
|||||||
import urllib.parse
|
import urllib.parse
|
||||||
import logger as log
|
import logger as log
|
||||||
|
|
||||||
log.out("mongo.py: Fetching configuration", "INFO")
|
|
||||||
db_host = os.getenv('OWL_DB_HOST', 'localhost')
|
db_host = os.getenv('OWL_DB_HOST', 'localhost')
|
||||||
db_port = os.getenv('OWL_DB_PORT', 27017)
|
db_port = os.getenv('OWL_DB_PORT', 27017)
|
||||||
db_user = urllib.parse.quote_plus(os.getenv('OWL_DB_USER', "owl"))
|
db_user = urllib.parse.quote_plus(os.getenv('OWL_DB_USER', "owl"))
|
||||||
db_pass = urllib.parse.quote_plus(os.getenv('OWL_DB_PASS', "twittwoo"))
|
db_pass = urllib.parse.quote_plus(os.getenv('OWL_DB_PASS', "twittwoo"))
|
||||||
db_name = os.getenv('OWL_DB_NAME', "owlboard")
|
db_name = os.getenv('OWL_DB_NAME', "owlboard")
|
||||||
|
|
||||||
log.out(f"mongo.py: Connecting to database at {db_host}:{db_port}", "DBUG")
|
log.out(f"mongo.py: Database URI: {db_host}:{db_port}", "DBUG")
|
||||||
client = MongoClient(f"mongodb://{db_user}:{db_pass}@{db_host}:{db_port}")
|
client = MongoClient(f"mongodb://{db_user}:{db_pass}@{db_host}:{db_port}")
|
||||||
db = client[db_name]
|
db = client[db_name]
|
||||||
|
|
||||||
|
log.out("mongo.py: Mongo module loaded", "DBUG")
|
||||||
|
|
||||||
def metaCheckTime(target):
|
def metaCheckTime(target):
|
||||||
col = db["meta"]
|
col = db["meta"]
|
||||||
res = col.find_one({"target": target, "type": "collection"})
|
res = col.find_one({"target": target, "type": "collection"})
|
||||||
@ -104,6 +105,13 @@ def putBulkPis(data):
|
|||||||
metaUpdateTime(collection)
|
metaUpdateTime(collection)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def putMany(collection :str, data :list):
|
||||||
|
log.out(f"mongo.putMany: Inserting many documents to: {collection}")
|
||||||
|
col = db[collection]
|
||||||
|
incrementCounter(collection)
|
||||||
|
col.insert_many(data)
|
||||||
|
metaUpdateTime(collection)
|
||||||
|
|
||||||
|
|
||||||
def incrementCounter(target):
|
def incrementCounter(target):
|
||||||
collection = "meta"
|
collection = "meta"
|
||||||
@ -116,7 +124,6 @@ def metaCounters():
|
|||||||
collection = "meta"
|
collection = "meta"
|
||||||
col = db[collection]
|
col = db[collection]
|
||||||
res = col.find_one({"target": "counters","type": "count"})
|
res = col.find_one({"target": "counters","type": "count"})
|
||||||
log.out(f'mongo.metaCounters: Query returned `{res}`', "DEBG")
|
|
||||||
if type(res) is dict:
|
if type(res) is dict:
|
||||||
if 'since' in res:
|
if 'since' in res:
|
||||||
log.out('mongo.metaCounters: counters already exists, skipping', "INFO")
|
log.out('mongo.metaCounters: counters already exists, skipping', "INFO")
|
||||||
@ -139,6 +146,7 @@ def putTimetable(data):
|
|||||||
res = col.insert_many(data)
|
res = col.insert_many(data)
|
||||||
|
|
||||||
def dropCollection(collection):
|
def dropCollection(collection):
|
||||||
|
log.out(f"mongo.dropCollection: Dropping collection '{collection}'")
|
||||||
col = db[collection]
|
col = db[collection]
|
||||||
res = col.drop()
|
res = col.drop()
|
||||||
|
|
||||||
@ -146,3 +154,31 @@ def deleteTimetableData(query):
|
|||||||
collection = "timetable"
|
collection = "timetable"
|
||||||
col = db[collection]
|
col = db[collection]
|
||||||
res = col.delete_one(query)
|
res = col.delete_one(query)
|
||||||
|
|
||||||
|
def getMetaHash(target):
|
||||||
|
collection = "meta"
|
||||||
|
col = db[collection]
|
||||||
|
res = col.find_one({"target": target, "type": "collection"})
|
||||||
|
incrementCounter("meta")
|
||||||
|
if type(res) is dict:
|
||||||
|
if 'updated' in res:
|
||||||
|
try:
|
||||||
|
log.out(f'mongo.metaGetHash: {target} hash is {res["hash"]}', 'INFO')
|
||||||
|
return res["hash"]
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
|
def putMetaHash(target :str, hash :str):
|
||||||
|
collection = "meta"
|
||||||
|
col = db[collection]
|
||||||
|
filter = {
|
||||||
|
"target": target,
|
||||||
|
"type": "collection"
|
||||||
|
}
|
||||||
|
update = {
|
||||||
|
"target": target,
|
||||||
|
"type": "collection",
|
||||||
|
"hash": hash
|
||||||
|
}
|
||||||
|
res = col.update_one(filter, {"$set": update}, upsert=True)
|
48
src/pis.py
48
src/pis.py
@ -1,9 +1,34 @@
|
|||||||
import yaml
|
import yaml, hashlib
|
||||||
|
import logger as log
|
||||||
|
import mongo
|
||||||
|
|
||||||
print("PIS Module imported")
|
print("pis.py: PIS Module Loaded", "DBUG")
|
||||||
|
file_location :str = "/app/data/pis/gwr.yaml"
|
||||||
|
|
||||||
|
def runUpdate():
|
||||||
|
if (not requiresUpdate()):
|
||||||
|
log.out('pis.runUpdate: PIS Codes do not need updating', 'INFO')
|
||||||
|
return
|
||||||
|
log.out(f"pis.runUpdate: Update required", "INFO")
|
||||||
|
pis_data = load()
|
||||||
|
pis_parsed = parse(pis_data)
|
||||||
|
mongo.dropCollection("pis")
|
||||||
|
mongo.putMany("pis", pis_parsed)
|
||||||
|
|
||||||
|
def requiresUpdate():
|
||||||
|
currentHash = mongo.getMetaHash("pis")
|
||||||
|
with open(file_location, "r") as f:
|
||||||
|
text = f.read()
|
||||||
|
newHash = hashlib.md5(text.encode()).hexdigest()
|
||||||
|
if currentHash is None or newHash != currentHash:
|
||||||
|
log.out(f"pis.requiresUpdate: currentHash: {currentHash}, newHash: {newHash}", "INFO")
|
||||||
|
mongo.putMetaHash("pis", newHash)
|
||||||
|
return True
|
||||||
|
mongo.putMetaHash("pis", newHash)
|
||||||
|
return False
|
||||||
|
|
||||||
def load(): # Programatically add a `toc` field to each entry.
|
def load(): # Programatically add a `toc` field to each entry.
|
||||||
with open("/app/data/pis/gwr.yaml", "r") as data:
|
with open(file_location, "r") as data:
|
||||||
try:
|
try:
|
||||||
pis = yaml.safe_load(data)
|
pis = yaml.safe_load(data)
|
||||||
print(pis)
|
print(pis)
|
||||||
@ -12,9 +37,6 @@ def load(): # Programatically add a `toc` field to each entry.
|
|||||||
print(exc)
|
print(exc)
|
||||||
return exc
|
return exc
|
||||||
|
|
||||||
## Do some magic here so that if any pis["pis"]["stops"][0] field contains 'reverse' then get the stops for the code stored in pis["pis"]["stops"][1]
|
|
||||||
## reverse the stops and store that.
|
|
||||||
|
|
||||||
def parse(codeList):
|
def parse(codeList):
|
||||||
StartLen = len(codeList)
|
StartLen = len(codeList)
|
||||||
print(f"pis.parse: codeList starting length: {StartLen}")
|
print(f"pis.parse: codeList starting length: {StartLen}")
|
||||||
@ -27,17 +49,3 @@ def parse(codeList):
|
|||||||
codeList.remove(ii)
|
codeList.remove(ii)
|
||||||
print(f"pis.parse: Removed {StartLen - len(codeList)} duplicates")
|
print(f"pis.parse: Removed {StartLen - len(codeList)} duplicates")
|
||||||
return codeList
|
return codeList
|
||||||
|
|
||||||
def devLoad(): # Programatically add a `toc` field to each entry.
|
|
||||||
with open("/home/fred.boniface/git/owlboard/db-manager/data/pis/gwr.yaml", "r") as data:
|
|
||||||
try:
|
|
||||||
pis = yaml.safe_load(data)
|
|
||||||
print(pis)
|
|
||||||
return pis["pis"]
|
|
||||||
except yaml.YAMLError as exc:
|
|
||||||
print(exc)
|
|
||||||
return exc
|
|
||||||
|
|
||||||
def dev():
|
|
||||||
data = devLoad()
|
|
||||||
parse(data)
|
|
@ -14,6 +14,8 @@
|
|||||||
# program. If not, see
|
# program. If not, see
|
||||||
# https://git.fjla.uk/OwlBoard/db-manager/src/branch/main/LICENSE
|
# https://git.fjla.uk/OwlBoard/db-manager/src/branch/main/LICENSE
|
||||||
|
|
||||||
|
REBUILD :bool = False ## Set to true to rebuild database
|
||||||
|
|
||||||
#Imports
|
#Imports
|
||||||
import os
|
import os
|
||||||
import requests
|
import requests
|
||||||
@ -27,20 +29,22 @@ from datetime import datetime, timedelta
|
|||||||
# This module downloads a single TOCs Schedule data
|
# This module downloads a single TOCs Schedule data
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
yesterday = now - timedelta(days=1)
|
yesterday = now - timedelta(days=1)
|
||||||
yesterdayDay = yesterday.strftime("%a").lower()
|
yesterdayDay = yesterday.strftime("%a").lower()
|
||||||
TOC_Code = "EF" # Business code for GWR
|
TOC_Code = "EF" # Business code for GWR
|
||||||
fullDataUrl = f"https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_{TOC_Code}_TOC_FULL_DAILY&day=toc-full"
|
fullDataUrl = f"https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_{TOC_Code}_TOC_FULL_DAILY&day=toc-full"
|
||||||
updateDataUrl = f"https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_{TOC_Code}_TOC_UPDATE_DAILY&day=toc-update-{yesterdayDay}"
|
updateDataUrl = f"https://publicdatafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_{TOC_Code}_TOC_UPDATE_DAILY&day=toc-update-{yesterdayDay}"
|
||||||
CORPUS_USER = os.getenv('OWL_LDB_CORPUSUSER')
|
CORPUS_USER = os.getenv('OWL_LDB_CORPUSUSER')
|
||||||
CORPUS_PASS = os.getenv('OWL_LDB_CORPUSPASS')
|
CORPUS_PASS = os.getenv('OWL_LDB_CORPUSPASS')
|
||||||
|
|
||||||
|
log.out("timetable.py: Timetable module loaded", "DBUG")
|
||||||
|
|
||||||
# Determine state of current Timetable Database
|
# Determine state of current Timetable Database
|
||||||
def isUpdateRequired():
|
def isUpdateRequired():
|
||||||
timetableLength = mongo.getLength("timetable")
|
timetableLength = mongo.getLength("timetable")
|
||||||
log.out(f"timetable.isUpdateRequired: timetable collection contains {timetableLength} documents", "DBUG")
|
log.out(f"timetable.isUpdateRequired: timetable collection contains {timetableLength} documents", "DBUG")
|
||||||
timetableUpdateDate = mongo.metaCheckTime("timetable")
|
timetableUpdateDate = mongo.metaCheckTime("timetable")
|
||||||
log.out(f"timetable.isUpdateRequired: Timetable last updated at {timetableUpdateDate}", "INFO")
|
log.out(f"timetable.isUpdateRequired: Timetable last updated at {timetableUpdateDate}", "INFO")
|
||||||
if (not timetableLength or int(time.time()) > timetableUpdateDate + 172800):
|
if (not timetableLength or int(time.time()) > timetableUpdateDate + 172800 or REBUILD):
|
||||||
log.out(f"timetable.isUpdateRequired: timetable collection requires rebuild", "INFO")
|
log.out(f"timetable.isUpdateRequired: timetable collection requires rebuild", "INFO")
|
||||||
return "full"
|
return "full"
|
||||||
if (int(time.time()) > (timetableUpdateDate + 86400)):
|
if (int(time.time()) > (timetableUpdateDate + 86400)):
|
||||||
@ -99,8 +103,8 @@ def insertSchedule(sch_record):
|
|||||||
schedule = sch_record['JsonScheduleV1']
|
schedule = sch_record['JsonScheduleV1']
|
||||||
scheduleId = schedule['CIF_train_uid']
|
scheduleId = schedule['CIF_train_uid']
|
||||||
transactionType = schedule['transaction_type']
|
transactionType = schedule['transaction_type']
|
||||||
if ('schedule_start_date' in sch_record):
|
if ('schedule_start_date' in schedule):
|
||||||
scheduleStart = _helpParseDate(sch_record['schedule_start_date'])
|
scheduleStart = _helpParseDate(schedule['schedule_start_date'])
|
||||||
else:
|
else:
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
scheduleStart = now.replace(hour=0,minute=0,second=0,microsecond=0)
|
scheduleStart = now.replace(hour=0,minute=0,second=0,microsecond=0)
|
||||||
@ -167,7 +171,7 @@ def _helpParseDays(string):
|
|||||||
selectedDays = [daysList[i] for i, value in enumerate(string) if value == "1"]
|
selectedDays = [daysList[i] for i, value in enumerate(string) if value == "1"]
|
||||||
return selectedDays
|
return selectedDays
|
||||||
|
|
||||||
def _helpParseDate(string, time):
|
def _helpParseDate(string :str, time :str = "false"):
|
||||||
# Incoming string contains date in format %Y-%m-%d, if the time signified end of schedule,
|
# Incoming string contains date in format %Y-%m-%d, if the time signified end of schedule,
|
||||||
# append 23:59:59 to the time, else append 00:00:00 to the string.
|
# append 23:59:59 to the time, else append 00:00:00 to the string.
|
||||||
if time == "end":
|
if time == "end":
|
||||||
|
Reference in New Issue
Block a user