import yaml, hashlib import logger as log import mongo REBUILD :bool = False # Set to True to force rebuild print("pis.py: PIS Module Loaded", "DBUG") file_location :str = "/app/data/pis/gwr.yaml" # Production & Testing #file_location :str = "/home/fred.boniface/git/owlboard/db-manager/data/pis/gwr.yaml" # Local Development def runUpdate(): if (not requiresUpdate()): log.out('pis.runUpdate: PIS Codes do not need updating', 'INFO') return log.out(f"pis.runUpdate: Update required", "INFO") pis_data = load() pis_parsed = parse(pis_data) mongo.dropCollection("pis") mongo.putMany("pis", pis_parsed) mongo.createSingleIndex("pis", "stops") mongo.createSingleIndex("pis", "tiplocs") def requiresUpdate(): if REBUILD: return True currentHash = mongo.getMetaHash("pis") with open(file_location, "r") as f: text = f.read() newHash = hashlib.md5(text.encode()).hexdigest() if currentHash is None or newHash != currentHash: log.out(f"pis.requiresUpdate: currentHash: {currentHash}, newHash: {newHash}", "INFO") mongo.putMetaHash("pis", newHash) return True return False def load(): # Programatically add a `toc` field to each entry. with open(file_location, "r") as data: try: pis = yaml.safe_load(data) print(pis) return pis["pis"] except yaml.YAMLError as exc: print(exc) return exc def parse(codeList): StartLen = len(codeList) log.out(f"pis.parse: codeList starting length: {StartLen}", "DBUG") log.out(f"pis.parse: Removing duplicate codes & adding TIPLOCs") for i in codeList: stops = i['stops'] print(stops) code = i['code'] for ii in codeList: if stops == ii['stops'] and code != ii['code']: print(f"Identical stopping pattern found: {ii['code']}") codeList.remove(ii) tiplocs = [] for iii in stops: print(iii) tiplocs.append(getTiploc(iii)) i['tiplocs'] = tiplocs print(f"pis.parse: Removed {StartLen - len(codeList)} duplicates") return codeList def getTiploc(crs :str): CRS = crs.upper() #log.out(f"pis.getTiploc: Finding TIPLOC for {CRS}") query = { '3ALPHA': CRS } res = mongo.query("stations", query) print(res) if 'TIPLOC' in res: return res['TIPLOC'] return "UNKNOWN"