Compare commits

..

30 Commits

Author SHA1 Message Date
7585453ff0 Add exception catch 2024-10-03 12:40:15 +01:00
69ec6d2732 Additional logging 2024-10-03 12:26:35 +01:00
31f1495833 Set OwlBoard connection check timeout to 10s 2024-10-03 11:54:28 +01:00
87cbd484ce Add fallback where CRS is unable to be obtained 2024-10-03 11:50:03 +01:00
452ce699ee Add source file to YAML output 2024-05-02 12:19:38 +01:00
259c5bc9b7 Attempt adding file name to output 2024-05-02 12:17:46 +01:00
bcac814800 Add config 'toLoad' list 2024-04-16 10:56:46 +01:00
fdb6f73f26 Update to handle MMDD file name format, rather than the manual renaming of files to YYYYMMDD format. 2024-04-16 10:49:54 +01:00
9439a4e251 Update to handle two digit dates in filenames as they are published 2024-04-04 10:48:41 +01:00
38053cf161 Add git folders to gitignore 2024-04-02 14:21:57 +01:00
0a494ad81f Removed exeption causing error in downloaded data 2024-04-02 14:21:40 +01:00
8e6bb25471 Do not publish empty stop services 2024-03-08 21:00:47 +00:00
fddda2063e Use new folder for added PIS Codes 2024-03-08 11:44:18 +00:00
c23baffa36 Fix for failed CRS match with RDNG4AB 2024-02-22 16:18:47 +00:00
99fd2e3e8d Working but needs tidying 2024-02-22 12:30:11 +00:00
676beab6b3 Gitpush needs auth 2024-02-22 00:10:58 +00:00
f5d0877151 Git doesn't work 2024-02-21 23:56:35 +00:00
a98e069b88 Adjust format of review required code 2024-02-21 23:01:05 +00:00
d1728770c3 Kind of there, much tidying to do! 2024-02-21 22:49:40 +00:00
1b658209ad Add DOCX to gitignore 2024-02-21 20:33:51 +00:00
e9a6fcfb66 Output format 2024-02-21 20:23:58 +00:00
3faed4a41c Output formatting 2024-02-21 20:22:58 +00:00
b4fb7211f3 MORE 2024-02-21 19:28:32 +00:00
4d3f7ce342 Even more meating 2024-02-21 13:44:27 +00:00
d5d7b6626b Further meating 2024-02-20 10:17:10 +00:00
ef8b8f1fd2 OB_Connector 2024-02-18 00:02:24 +00:00
82f885466e Add IMAP Connector 2024-02-16 22:01:27 +00:00
de482074e6 Introduce VENV 2024-02-16 21:22:18 +00:00
bb15cf492a Add notes 2024-02-16 21:19:09 +00:00
59f6439872 Changes:
- PIS Finder
 - DOCX Parser
 - LOCAL MODE (DEPRECATED ALREADY)
2024-02-16 21:18:26 +00:00
18 changed files with 590 additions and 140 deletions

9
.gitignore vendored
View File

@ -1,3 +1,10 @@
env_conf
include
*.docx
*.pdf
git
run.sh
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
@ -17,6 +24,8 @@ eggs/
.eggs/ .eggs/
lib/ lib/
lib64/ lib64/
lib64
bin
parts/ parts/
sdist/ sdist/
var/ var/

View File

@ -1,14 +1,10 @@
# DEPRECATED - DIAGRAMS ARE NOW IN PDF FORMAT.
https://git.fjla.uk/owlboard/dgp2 supports new PDF format schedule cards and offers some automated validation of codes. This project will not be maintained.
# diagram-parser # diagram-parser
This is an experimental project and is not yet used as part of the OwlBoard stack. This is an experimental project and is not yet used as part of the OwlBoard stack.
## Language ## Language
It is so-far undecided what language will be used. Documents for parsing are likely to be a few hundred lines long so searching may become processor intensive meaning Go may be a good candidate, however Python offers an array of libraries which coule be helpful. It is so-far undecided what language will be used. Documents for parsing are likely to be a few hundred lines long so searching may become processor intensive meaning Go may be a good candidate, however Python offers an array of libraries which could be helpful.
## File formats ## File formats
@ -18,24 +14,25 @@ Diagrams are received in DOCX format, however can be easily be converted to ODT,
The aim of diagram-parser is to simplify the addition of PIS codes that are not yet in the OwlBoard data source. The planned implementation is as follows: The aim of diagram-parser is to simplify the addition of PIS codes that are not yet in the OwlBoard data source. The planned implementation is as follows:
- diagram-parser is subscribed to an email inbox (IMAP/POP3) - diagram-parser is subscribed to an email inbox (IMAP/POP3)
- Formatted train-crew schedule cards are sent to the inbox and loaded by diagram-parser - Formatted train-crew schedule cards are sent to the inbox and loaded by diagram-parser
- List of existing PIS codes is loaded and a list of non-existent codes is compiled (0000-9999) - List of existing PIS codes is loaded and a list of non-existent codes is compiled (0000-9999)
- If a code is found both in the diagram and on the list of non-existent codes, a Gitea issue is opened providing details of the code. - If a code is found both in the diagram and on the list of non-existent codes, a Gitea issue is opened providing details of the code.
- Once the program has run and extracted only the relavent details, the email is deleted and the file is closed and not stored. - Once the program has run and extracted only the relavent details, the email is deleted and the file is closed and not stored.
- The evantual aim is to avoid any manual searching of the files. - The evantual aim is to avoid any manual searching of the files.
The current process of adding new codes involves being made aware of them face to face, or finding them myself and manually finding and adding them to the data source. The current process of adding new codes involves being made aware of them face to face, or finding them myself and manually finding and adding them to the data source.
## Points to Remember ## Points to Remember
- Emails received should be verified. - Emails received should be verified.
- A pre-authorised key in the subject field, any emails not matching the key should be discarded. - A pre-authorised key in the subject field, any emails not matching the key should be discarded.
- Attachment formats may vary slightly. - Attachment formats may vary slightly.
- The format of the attachment should be checked and any errors handled gracefully. - The format of the attachment should be checked and any errors handled gracefully.
- Avoid duplicate issues - Avoid duplicate issues
- Issues opened should contain the missing PIS code in their title, this application should check for any open issues containing the missing code to avoid duplicated issues. - Issues opened should contain the missing PIS code in their title, this application should check for any open issues containing the missing code to avoid duplicated issues.
## Main external dependencies (Expected) ## Main external dependencies (Expected)
- imaplib
- email - imaplib
- email

5
pyvenv.cfg Normal file
View File

@ -0,0 +1,5 @@
home = /usr/bin
include-system-site-packages = false
version = 3.12.3
executable = /usr/bin/python3.12
command = /usr/bin/python -m venv /home/fred.boniface/Desktop/diagrams-to-parse/diagram-parser

13
requirements.txt Normal file
View File

@ -0,0 +1,13 @@
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
gitdb==4.0.11
GitPython==3.1.42
idna==3.6
lxml==5.1.0
pycparser==2.21
python-docx-2023==0.2.17
PyYAML==6.0.1
requests==2.31.0
smmap==5.0.1
urllib3==2.2.1

63
src/config.py Normal file
View File

@ -0,0 +1,63 @@
# Load configuration from file/env variables
import os
def load():
cfg = {}
toLoad = [
{
"envname": "DG_IMAP_HOST",
"filepath": "/owlboard/dgp/imap/host"
},
{
"envname": "DG_IMAP_PORT",
"filepath": "/owlboard/dgp/imap/port",
"default": "unk"
},
{
"envname": "DG_IMAP_USER",
"filepath": "/owlboard/dgp/imap/user",
},
{
"envname": "DG_IMAP_PASS",
"filepath": "/owlboard/dgp/imap/pass",
},
{
"envname": "DG_OWL_UUID",
"filepath": "/owlboard/dgp/api/uuid",
},
{
"envname": "DG_GITEA_KEY",
"filepath": "/owlboard/dgp/gitea/key"
},
{
"envname": "DG_GITEA_HOST",
"filepath": "/owlboard/dgp/gitea/host"
},
{
"envname": "DG_GITEA_SSHPORT",
"filepath": "/owlboard/dgp/gitea/sshport"
}
]
for item in toLoad:
filepath = item["filepath"]
envname = item["envname"]
default = item.get("default")
# Try to load value from file
try:
with open(filepath, "r") as file:
value = file.read().strip()
except FileNotFoundError:
# If file doesn't exist, try to get value from environment variable
value = os.environ.get(envname)
# If value is still not found, use the default if provided
if value is None and default is not None:
value = default
# Add the value to the cfg dictionary
cfg[envname] = value
return cfg

View File

@ -1,34 +0,0 @@
### This uses the 'python-docx-2023' module
from docx import Document
def extract_table(file_path):
document = Document(file_path)
table = document.tables[4]
print(document.tables[1])
print(document.tables[2])
print(document.tables[3])
print(document.tables[4])
print(document.tables[5])
data = []
keys = None
for i, row in enumerate(table.rows):
text = (cell.text for cell in row.cells)
if i == 0:
keys = tuple(text)
continue
row_data = dict(zip(keys, text))
data.append(row_data)
print(data)
if __name__ == "__main__":
extract_table("./file.docx")
### This can parse each table. What needs to happen next
### is to parse all tables, then check for a PIS code.
### If PIS code exists, then find the associated headcode,
### Then an API request can be made to OwlBoard to try
### and find a service with valid stopping pattern,
### then the PIS codes can be generated for review.

View File

@ -1,87 +0,0 @@
import os, sys, json, subprocess, re, yaml, requests
report_file_path = "./report.txt"
code_store_list_url = "https://git.fjla.uk/OwlBoard/data/raw/branch/main/pis/gw.yaml"
def is_pdfgrep_installed():
try:
subprocess.check_call(["pdfgrep", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError:
return False
def fetch_and_parse_yaml(url):
try:
response = requests.get(url)
response.raise_for_status()
existing_codes = yaml.safe_load(response.text)
return existing_codes
except Exception as e:
print(f"Error downloading and parsing codes: {e}")
sys.exit(1)
def main():
if len(sys.argv) != 2:
print("Usage: python pdf_code_extraction.py <directory_path>")
sys.exit(1)
pdf_directory = sys.argv[1]
if not os.path.isdir(pdf_directory):
print(f"'{pdf_directory}' is not a valid directory.")
sys.exit(1)
if not is_pdfgrep_installed():
print("pdfgrep is not installed on your system.")
sys.exit(1)
code_list = []
pdfgrep_cmd = f"pdfgrep -Prno 'code\\s*:\\s*\\d{{4}}' {pdf_directory}"
pdfgrep_output = subprocess.getoutput(pdfgrep_cmd)
for line in pdfgrep_output.splitlines():
match = re.search(r"^(.*?):\s*code\s*:\s*(\d{4})", line)
if match:
filename, code = match.groups()
code_list.append({"file":filename, "code":str(code)})
existing_codes = fetch_and_parse_yaml(code_store_list_url)['pis']
existing_set = set()
for item in existing_codes:
code = item['code']
existing_set.add(str(code))
unique_codes = set()
unique_code_list = []
missing_codes = []
for item in code_list:
code = item['code']
if code not in unique_codes:
unique_codes.add(code)
unique_code_list.append(item)
if code not in existing_set:
missing_codes.append(item)
#print(missing_codes)
report = f"""
Number of missing codes found: {len(missing_codes)}
Missing Codes:
"""
for item in missing_codes:
report += f"\n - code: {item['code']}\n stops: (File: {item['file']})"
print(f"Saving report to {report_file_path}")
with open(report_file_path, 'w') as report_file:
report_file.write(report)
print(report)
if __name__ == "__main__":
main()

3
src/find_service.py Normal file
View File

@ -0,0 +1,3 @@
## Uses the HEADCODE to guess at the service the PIS code matches
## Where there are multiple matches both are prepared and
## await human review.

34
src/formatter.py Normal file
View File

@ -0,0 +1,34 @@
import owlboard_connector
import sys
def humanYaml(pis_list):
additional_pis = ''
manual_review = ''
for pis in pis_list:
if len(pis['services']) == 1:
crs = []
try:
if (len(pis['services'][0]['stops']) > 0) :
for stop in pis['services'][0]['stops']:
crs.append(owlboard_connector.convert_tiploc_to_crs(stop))
additional_pis += f' - code: "{pis["pis"]}"\n'
additional_pis += f' #headcode: {pis["headcode"]}\n'
additional_pis += f' #date: {pis["date"]}\n'
additional_pis += f' #source_file: {pis["diagram_file"]}\n'
additional_pis += f' stops: [{",".join(crs)}]\n'
except Exception as err:
print(err)
elif len(pis['services']) > 1:
manual_review += f'## THIS CODE REQUIRES MANUAL VERIFICATION\n'
manual_review += f' - code: "{pis["pis"]}"\n'
manual_review += f' #headcode: {pis["headcode"]}\n'
manual_review += f' #date: {pis["date"]}\n'
manual_review += f' #source_file: {pis["diagram_file"]}\n'
for service in pis["services"]:
crs = []
if service and service['stops']:
for stop in service['stops']:
crs.append(owlboard_connector.convert_tiploc_to_crs(stop))
manual_review += f' stops: [{",".join(crs)}]\n'
return "FOR REVIEW\n" + additional_pis + manual_review

42
src/gitea_connector.py Normal file
View File

@ -0,0 +1,42 @@
import requests, os, git
from datetime import datetime
BASE_URL = "https://git.fjla.uk/"
REPO_URL = f"{BASE_URL}owlboard/data"
REPO_PATH = "./git/clone/data"
USER = 'owlbot'
TOKEN = os.environ.get('DGP_GITEA_TOK')
HEADERS = {
'Content-Type': 'application/json',
'accept': 'application/json',
}
BRANCH_NAME = 'auto-' + datetime.now().strftime("%Y%m%d-%H%M%S")
FILE_NAME = 'dg_parser_' + datetime.now().strftime("%Y%m%d-%H%M%S")
'''
I need a way here to get the original file from the 'main' branch and
append the generated PIS codes. Then push to a new generated branch.
Then a pull request should be created but can probably be done with actions.
In reality this program should just take in DOCX files and spit out formatted
PIS data to the repo, everything else can be handled at the repo level??
None of this currently works...
'''
def clone_repository():
git.Repo.clone_from(REPO_URL, REPO_PATH)
def commit_and_push_changes(text_to_append, commit_message):
repo = git.Repo(REPO_PATH)
repo.git.checkout("-b", BRANCH_NAME)
with open(REPO_PATH + f"/pis/{FILE_NAME}.yaml", 'w') as file:
file.write(text_to_append)
repo.index.add([f"pis/{FILE_NAME}.yaml"])
repo.index.commit(commit_message)
origin = repo.remote(name='origin')
origin_url_credentials = REPO_URL.replace('https://', f'https://{USER}:{TOKEN}@')
origin.set_url(origin_url_credentials)
origin.push(refspec=BRANCH_NAME)

44
src/imap_connector.py Normal file
View File

@ -0,0 +1,44 @@
import imaplib, email, os
class IMAPConnector:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'imap_connection'):
IMAP_SERVER = os.environ.get('DGP_EML_HOST')
IMAP_USER = os.environ.get('DGP_EML_USER')
IMAP_PASS = os.environ.get('DGP_EML_PASS')
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASS]):
raise ValueError("Please ensure DGP_EML_HOST, DGP_EML_USER and DGP_EML_PASS are defined in the environment")
self.imap_connection = imaplib.IMAP4_SSL(IMAP_SERVER)
self.imap_connection.login(IMAP_USER, IMAP_PASS)
self.imap_connection.select('INBOX')
def fetch_filtered_emails(self, sender_email):
filtered_emails = []
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
if result == 'OK':
for num in data[0].split():
result, email_data = self.imap_connection.fetch(num, '(RFC822)')
if result == 'OK':
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
filtered_emails.append(email_message)
return filtered_emails
def delete_emails_from_sender(self, sender_email):
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
if result == 'OK':
for num in data[0].split():
self.imap_connection.store(num, '+FLAGS', '\\Deleted')
self.imap_connection.expunge()
print(f"All messages from {sender_email} deleted successfully")

76
src/local_mode.py Normal file
View File

@ -0,0 +1,76 @@
import parse_docx, pis_find, owlboard_connector, formatter, gitea_connector
import os, sys
def start():
print("Running OwlBoard Diagram Parser in local mode")
if not owlboard_connector.check_connection():
print("Exiting")
sys.exit(1)
else:
print("OwlBoard connection successful")
working_directory = os.getcwd()
print("Working directory: ", working_directory)
## Get all files in directory
files = [f for f in os.listdir(working_directory) if os.path.isfile(os.path.join(working_directory, f))]
docx_files = [f for f in files if f.endswith(".docx")]
results = []
if docx_files:
print(f"Found {len(docx_files)} DOCX files in directory")
for file in docx_files:
print(file)
items = parse_docx.extract_tables(file)
results.extend(items)
else:
print("No DOCX files found")
print(f"Found {len(results)} PIS Codes in documents")
missing_pis = pis_find.run(results)
print(missing_pis)
get_detail = []
for code in missing_pis:
print(f"Fetching services with code: {code}")
services = owlboard_connector.get_services(code['headcode'], code['date'])
get_detail.append({
'pis': code['pis'],
'services': services,
'diagram_file': code['source_file'],
'date': code['date'],
'headcode': code['headcode'],
})
details = []
for item in get_detail:
detail = {
'pis': item['pis'],
'headcode': item['headcode'],
'date': item['date'],
'services': [],
'diagram_file': item['diagram_file']
}
for service in item['services']:
service_detail = owlboard_connector.get_service_detail(service['trainUid'], item['date'])
detail['services'].append(service_detail)
details.append(detail)
formatted_additions = formatter.humanYaml(details)
print(formatted_additions)
out = open("pis_output", "a")
f.write('\n---\n')
f.write(formatted_additions)
f.close()
gitea_connector.clone_repository()
gitea_connector.commit_and_push_changes(formatted_additions,"From owlbot diagram-parser")
if __name__ == "__main__":
print("To use local mode, please call `main.py local`")

2
src/mailbox_mode.py Normal file
View File

@ -0,0 +1,2 @@
import imaplib
import email

16
src/main.py Normal file
View File

@ -0,0 +1,16 @@
import sys
def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "local"
if mode == "local":
import local_mode
local_mode.start()
elif mode == "mailbox":
print("MailBox mode not available yet")
pass
else:
print("Invalid mode. Please specify 'local' or 'mailbox'")
if __name__ == "__main__":
main()

104
src/owlboard_connector.py Normal file
View File

@ -0,0 +1,104 @@
### API REQUESTS HERE
### AUTHENTICATION MUST BE COMPLETED, REGISTERING FOR THE API IF NECCESSARY
### THIS NEGATES THE ABILITY TO USE LOCAL MODE - MAILBOX MODE ONLY AS
### MAILBOX ACCESS IS NEEDED FOR REGISTRATION... DON'T BE A FUCKING IDIOT... I CAN JUST PASS
### A UUID IN TO THE PROGRAM!! TWAT.
import requests, os
OB_PIS_BASE_URL = "https://owlboard.info/api/v2/pis/byCode/"
OB_TRN_BASE_URL = "https://owlboard.info/api/v2/timetable/train/"
OB_TIP_BASE_URL = "https://owlboard.info/api/v2/ref/locationCode/tiploc/"
#OB_PIS_BASE_URL = "http://localhost:8460/api/v2/pis/byCode/"
#OB_TRN_BASE_URL = "http://localhost:8460/api/v2/timetable/train/"
#OB_TIP_BASE_URL = "http://localhost:8460/api/v2/ref/locationCode/tiploc/"
OB_TEST_URL = OB_PIS_BASE_URL + "5001"
UUID = os.environ.get('DGP_OB_UUID')
HEADERS = {
'user-agent': 'owlboard-diagram-parser',
'uuid': UUID
}
def check_connection():
if not UUID:
print("'DGP_OB_UUID' must be set in the environment")
return False
res = requests.get(OB_TEST_URL, headers=HEADERS, timeout=10)
if res.status_code == 401:
print("Error - Unauthorised. The UUID is not valid. STATUS: ", res.status_code, "UUID: ", UUID)
return False
elif res.status_code != 200:
print("Error - Unable to reach OwlBoard. STATUS: ", res.status_code)
return False
return True
def get_services(headcode, date):
print("Finding GWR service: ", headcode, ", ", date)
results = []
url = OB_TRN_BASE_URL + f"{date.strftime('%Y-%m-%d')}/headcode/{headcode.lower()}"
print(url)
try:
res = requests.get(url, headers=HEADERS)
if res.status_code == 200:
json_res = res.json()
for item in json_res:
if item['operator'] == 'GW':
results.append(item)
print(f"Found {len(results)} valid GWR Service")
return results
except Exception as e:
print(e)
sys.exit()
def get_service_detail(trainUid, date):
try:
print("Getting GWR service details: ", trainUid, ", ", date)
url = OB_TRN_BASE_URL + f"{date.isoformat()}/byTrainUid/{trainUid}"
print(url)
res = requests.get(url, headers=HEADERS)
if res.status_code == 200:
json_res = res.json()
if json_res:
svc_detail = {
'stops': json_res['stops'],
'vstp': json_res.get('vstp', False)
}
organised = organise_svc(svc_detail)
#print(res.text)
#print(organised)
print("Service Details Found")
return organised
else:
print("Service Not Found")
sys.exit()
except Exception as e:
print(e)
sys.exit()
def organise_svc(input):
stop_tiplocs = []
vstp = input['vstp']
for stop in input['stops']:
if stop['isPublic']:
stop_tiplocs.append(stop['tiploc'])
existingPis = False
if 'pis' in input and input['pis'].get('skipCount', 0) == 0:
existingPis = True
return {'stops': stop_tiplocs, 'vstp': vstp}
def convert_tiploc_to_crs(tiploc):
if tiploc == 'RDNG4AB':
return 'rdg'
res = requests.get(OB_TIP_BASE_URL + tiploc.upper(), headers=HEADERS)
if res.status_code == 200:
json_res = res.json()
if json_res:
crs = json_res[0]['3ALPHA']
return crs.lower()
else:
return "NO_CRS"

70
src/parse_docx.py Normal file
View File

@ -0,0 +1,70 @@
### This uses the 'python-docx-2023' module
from docx import Document
from datetime import datetime
import re
PIS_PATTERN = re.compile(r'PIS code\s*:\s*(\d{4})')
HEADCODE_PATTERN = re.compile(r'(\d{1}[A-Z]\d{2})')
def extract_tables(file_path):
document = Document(file_path)
print(f"Reading {len(document.tables)} tables from {file_path}")
pis_info = []
for table in document.tables:
data = []
for i, row in enumerate(table.rows):
text = (cell.text for cell in row.cells)
if i == 0:
keys = tuple(text)
continue
row_data = dict(zip(keys, text))
data.append(row_data)
pis_and_headcode = match_pis_and_headcode(data)
if pis_and_headcode:
pis_and_headcode['source_file'] = file_path
current_year = datetime.now().year
date_string_with_year = f"{current_year}{file_path.split()[0]}"
pis_and_headcode['date'] = datetime.strptime(date_string_with_year, "%Y%m%d")
pis_info.append(pis_and_headcode)
return(pis_info)
def match_pis_and_headcode(table_data):
pis_code = None
headcode = None
job_head = None
for item in table_data:
for key, value in item.items():
match = PIS_PATTERN.search(value)
if match:
pis_code = match.group(1)
job_head = key.strip()
break
if pis_code:
break
if pis_code:
for item in table_data:
for key in item:
match = HEADCODE_PATTERN.search(key)
if match:
headcode = match.group()
break
if headcode:
break
if pis_code and headcode:
return {'job_head': job_head, 'headcode': headcode, 'pis': pis_code}
else:
return None
def solo_run():
print(extract_tables("./file.docx"))
if __name__ == "__main__":
solo_run()

58
src/pis_fetch.py Normal file
View File

@ -0,0 +1,58 @@
## This module downloads and compiles a list of all PIS codes across all branches of the OwlBoard/Data repo.
## The function load_existing_pis() is expected to be used outside of the module.
import os, requests, yaml
## TESTING
GIT_URL = 'https://git.fjla.uk'
GIT_API = GIT_URL + '/api/v1'
def load_existing_pis():
all_pis_data = []
branches = get_branch_list()
for branch in branches:
branch_pis_data = get_branch_pis(branch)
if branch_pis_data is not None:
all_pis_data.append(branch_pis_data)
print(f"Branch: {branch}, PIS Codes: {len(branch_pis_data['pis'])}")
# Merging data and removing duplicates based on 'code' key
merged_pis_data = {} ### THIS BIT DOESN'T COMPARE PROPERLY... PRINT EACH TYPE TO SEE STRUCTURE
for branch_data in all_pis_data:
for item in branch_data['pis']:
code = item['code']
# Only keep the first occurrence of each 'code'
if code not in merged_pis_data:
merged_pis_data[code] = item
# Convert the dictionary back to a list of dictionaries
merged_pis_list = [{'code': code, 'stops': value['stops']} for code, value in merged_pis_data.items()]
print(f"Total unique codes: {len(merged_pis_list)}")
return merged_pis_list
def get_branch_list():
get_branches_endpoint = GIT_API + '/repos/owlboard/data/branches'
res = requests.get(get_branches_endpoint)
branches_json = res.json()
branches = []
for repo in branches_json:
branches.append(repo['name'])
print(branches)
return branches
def get_branch_pis(branch_name):
get_file_url = GIT_API + f'/repos/owlboard/data/raw/%2Fpis%2Fgw.yaml?ref={branch_name}'
res = requests.get(get_file_url)
print(res.status_code)
pis_yaml = res.text
dic = yaml.safe_load(pis_yaml)
return dic
if __name__ == "__main__":
print(load_existing_pis())

35
src/pis_find.py Normal file
View File

@ -0,0 +1,35 @@
## This modile compares discovered PIS codes with existing PIS codes obtained by calling pis_fetch
import pis_fetch
import sys
def run(data_list):
deduplicated_data = dedup(data_list)
print(f"Removed {len(data_list) - len(deduplicated_data)} duplicate codes")
print(f"Searching for {len(deduplicated_data)} PIS codes")
missing_data = find_missing(deduplicated_data)
print(f"{len(missing_data)} missing PIS codes in OwlBoard data")
return missing_data
def dedup(data_list):
unique_dicts = {d['pis']: d for d in data_list}.values()
unique_list_of_dicts = list(unique_dicts)
return unique_list_of_dicts
def find_missing(data_list):
existing_pis_list = pis_fetch.load_existing_pis()
missing_data = []
for item in data_list:
pis_code = item.get('pis')
if pis_code:
code_exists = False
for existing_pis in existing_pis_list:
if str(existing_pis['code']) == pis_code:
code_exists = True
break
if not code_exists:
missing_data.append(item)
return missing_data