This repository has been archived on 2024-11-02. You can view files and clone it, but cannot push or open issues or pull requests.
2023-07-02 20:46:22 +01:00

215 lines
8.7 KiB

# db-manager - Builds and manages an OwlBoard database instance - To be run on a
# cron schedule
# Copyright (C) 2023 Frederick Boniface
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see
REBUILD :bool = False ## Set to true to rebuild database
import os
import requests
import logger as log
import helpers
import zlib
import json
import mongo
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
# This module downloads a single TOCs Schedule data
now =
yesterday = now - timedelta(days=1)
yesterdayDay = yesterday.strftime("%a").lower()
todayDay = now.strftime("%a").lower()
isAfter0800 = (int(now.strftime("%H")) >= 8)
filePath = "cif_data"
fullDataUrl = ""
updateDataUrl = f"{yesterdayDay}"
log.out(" Timetable module loaded", "DBUG")
# Determine state of current Timetable Database
def isUpdateRequired():
timetableLength = mongo.getLength("timetable")
log.out(f"timetable.isUpdateRequired: timetable collection contains {timetableLength} documents", "DBUG")
timetableUpdateTime = mongo.metaCheckTime("timetable")
timetableDataAge = helpers.getAgeInSeconds(timetableUpdateTime)
readable_age = str(timedelta(seconds=timetableDataAge))
log.out(f"timetable.isUpdateRequired: Timetable data age: {readable_age}", "INFO")
if (timetableDataAge >= helpers.two_day_in_seconds and isAfter0800) or REBUILD:
log.out(f"timetable.isUpdateRequired: timetable collection requires rebuild", "INFO")
return "full"
if (timetableDataAge >= helpers.twenty_hours and isAfter0800):
log.out(f"timetable.isUpdateRequired: timetable collection requires update", "INFO")
return "update"
return False
def getTimetable(full :bool = False):
downloadUrl :str = fullDataUrl if full else updateDataUrl
response = requests.get(downloadUrl, auth=(CORPUS_USER, CORPUS_PASS))
log.out(f"timetable.getTimetable: Fetch (Full:{full}) response: {response.status_code}", "DBUG")
decompressed = zlib.decompress(response.content, 16+zlib.MAX_WBITS)
with open(filePath, "wb") as f:
return decompressed
def loopTimetable(data):
listify = data.splitlines()
documents :list = []
for item in listify:
dic = json.loads(item)
if ('JsonTimetableV1' in dic):
timestamp = dic['JsonTimetableV1']['timestamp']
sequence = dic['JsonTimetableV1']['Metadata']['sequence']
detail = {timestamp: timestamp, sequence: sequence}
# Do something with this data here
# Check if timestamp and sequence are correct, if not trigger a full download
elif ('TiplocV1' in dic):
# Not used as TIPLOCs etc. are sourced from CORPUS
elif ('JsonAssociationV1' in dic):
# Associates trains with eachother - not planning to use yet.
elif ('JsonScheduleV1' in dic):
document = insertSchedule(dic)
return documents
def runUpdate():
required = isUpdateRequired()
if (required == "full"):
log.out("timetable.runUpdate: Fetching full timetable data", "INFO")
data = getTimetable(full = True)
elif (required == "update"):
log.out("timetable.runUpdate: Fetching todays timetable update", "INFO")
data = getTimetable()
log.out("timetable.runUpdate: timetable update is not needed", "INFO")
return "done"
parsed = loopTimetable(data)
status = _insertToDb(parsed, required)
if (status):
def insertSchedule(sch_record):
schedule = sch_record['JsonScheduleV1']
scheduleId = schedule['CIF_train_uid']
transactionType = schedule['transaction_type']
if ('schedule_start_date' in schedule):
scheduleStart = _helpParseDate(schedule['schedule_start_date'])
now =
scheduleStart = now.replace(hour=0,minute=0,second=0,microsecond=0)
if ('schedule_end_date' in schedule):
scheduleEnd = _helpParseDate(schedule['schedule_end_date'], "end")
scheduleEnd = "null"
document = {
'transactionType': schedule.get('transaction_type'),
'stpIndicator': schedule.get('CIF_stp_indicator'),
'operator': schedule.get('atoc_code'),
'trainUid': scheduleId,
'headcode': schedule.get('schedule_segment', {}).get('signalling_id'),
'powerType': schedule.get('schedule_segment', {}).get('CIF_power_type'),
'planSpeed': schedule.get('schedule_segment', {}).get('CIF_speed'),
'scheduleStartDate': scheduleStart,
'scheduleEndDate': scheduleEnd,
'daysRun': _helpParseDays(schedule.get('schedule_days_runs', '0000000'))
if ('schedule_location' in schedule.get('schedule_segment', {})):
stops = _helpParseStops(schedule['schedule_segment']['schedule_location'])
stops = []
document['stops'] = stops
return document
def _insertToDb(data :list, type :str):
pre_count = mongo.getLength("timetable")
if type == "full":
mongo.createSingleIndex("timetable", "headcode")
elif type == "update":
create_transactions = []
for item in data:
if item['transactionType'] == "Create":
elif item['transactionType'] == "Delete":
mongo.deleteTimetableData({'trainUid': item['trainUid'], 'scheduleStartDate': item['scheduleStartDate'], 'stpIndicator': item['stpIndicator']})
post_count = mongo.getLength("timetable")
log.out(f"timetable._insertToDb: Document count difference after processing: {pre_count - post_count}", "DBUG")
return True #If Successfuls
except Exception as e:
log.out("timetable._insertToDb: Error inserting timetable data", "ERR")
log.out(f"timteable._insertToDb: {e}")
return False # If error inserting timetable
def _helpParseStops(schedule_segment):
stops = []
for i in schedule_segment:
timing_point = {}
public_departure = i.get("public_departure")
wtt_departure = i.get("departure")
public_arrival = i.get("public_arrival")
wtt_arrival = i.get("arrival")
tiploc_code = i.get("tiploc_code")
isPublic = False
if public_departure and len(public_departure) == 4 and public_departure.isdigit():
isPublic = True
timing_point['publicDeparture'] = public_departure
if public_arrival and len(public_arrival) == 4 and public_arrival.isdigit():
isPublic = True
timing_point['publicArrival'] = public_arrival
if wtt_departure:
timing_point['wttDeparture'] = wtt_departure
if wtt_arrival:
timing_point['wttArrival'] = wtt_arrival
timing_point['isPublic'] = isPublic
timing_point['tiploc'] = tiploc_code
return stops
def _helpParseDays(string):
# Incoming string contains seven numbers, each number from 0-6 representing days Mon-Sun
daysList = ["m", "t", "w", "th", "f", "s", "su"]
selectedDays = [daysList[i] for i, value in enumerate(string) if value == "1"]
return selectedDays
def _helpParseDate(string :str, time :str = "false"):
# Incoming string contains date in format %Y-%m-%d, if the time signified end of schedule,
# append 23:59:59 to the time, else append 00:00:00 to the string.
if time == "end":
string += " 235959"
string += " 000000"
return datetime.strptime(string, "%Y-%m-%d %H%M%S").astimezone(ZoneInfo("Europe/London"))
def _removeOutdatedServices():
log.out("timetable._removeOutdatedServices: Removing out of date schedules", "INFO")
pre_count = mongo.getLength("timetable")
query = {
"scheduleEndDate": {
"$lt": now
mongo.deleteMany("timetable", query)
post_count = mongo.getLength("timetable")
log.out(f"timetable._removeOutdatedServices: Removed {pre_count - post_count} out of date services", "DBUG")