From 8e78319cbd1941af47f711f6eb65fa7d6d1c1d53 Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Fri, 1 Nov 2024 21:35:11 +0000 Subject: [PATCH] Added further validation and issue creation --- .gitignore | 4 ++ src/gitea.py | 96 +++++++++++++++++++++++++++++++++++++++++++++ src/main.py | 12 +++++- src/train_detail.py | 4 +- src/validate.py | 53 ++++++++++++++++++++++--- 5 files changed, 161 insertions(+), 8 deletions(-) create mode 100644 src/gitea.py diff --git a/.gitignore b/.gitignore index 3b9b3bc..aa5609b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ *.pdf +auto_matched.txt +organised_for_processing.txt +validated.txt +output.txt output # Byte-compiled / optimized / DLL files diff --git a/src/gitea.py b/src/gitea.py new file mode 100644 index 0000000..68a07e3 --- /dev/null +++ b/src/gitea.py @@ -0,0 +1,96 @@ +import requests +import base64 +from urllib.parse import urljoin + +class GiteaConnector: + def __init__(self, repo_url, user, key): + self.repo_url = repo_url + self.user = user + self.key = key + + self.header = { + "accept": "application/json", + "Authorization": f"token {self.key}", + "Content-Type": "application/json", + } + + ## Create an issue in the repo + def create_issue(self,title,content): + url_suffix = "issues" + url = urljoin(self.repo_url, url_suffix) + + issue_data = { + "title": title, + "body": content, + "assignees": ["fred.boniface"], + "labels": ["bug", "urgent"], + } + + response = requests.post(url, headers=self.header, json=issue_data) + if response.status_code == 201: + print("Succesfully creates issue: ", response.json()) + return True + else: + print("Failed to create issue: ", response.status_code, response.text) + return False + + ## Download the 'gw' pis file from the repo + def download_pis_file(self): + url = urljoin(self.repo_url, 'contents/pis/gw.yaml') + response = requests.get(url, headers=self.header) + if response.status_code == 200: + file_content = base64.b64decode(response.json()['content']).decode('utf-8') + print("File downloaded successfully") + return file_content + else: + print("Failed to download file: ", response.status_code, response.text) + return "" + + ## Create new branch in the repo + def create_branch(self, branch_name): + BASE_BRANCH = "main" + base_branch_url = urljoin(self.repo_url, f"branch/{BASE_BRANCH}") + branch_response = requests.get(base_branch_url, headers=self.header) + + if branch_response.status_code == 200: + base_sha = branch_response.json()['commit']['id'] + + create_branch_url = urljoin(self.repo_url, "git/refs") + branch_data = { + "ref": f"refs/heads/{branch_name}", + "sha": base_sha, + } + + create_branch_response = requests.post(create_branch_url, headers=self.header, data=json.dumps(branch_data)) + if create_branch_response.status_code == 201: + print(f"Branch {branch_name} created successfully") + return True + else: + print(f"Failed to create branch: ", branch_name, create_branch_response.status_code, create_branch_response.text) + return False + + else: + print("Failed to retreive branch SHA: ", branch_response.status_code, branch_response.text) + return False + + ## Create a new file in the repo + def create_pis_file(self, branch, filename, file_content): + file_path = f"pis/{filename}" + encoded_content = base64.encode(file_content.encode('utf-8')) + + create_file_url = url_multijoin(self.repo_url, f'contents/{file_path}') + + file_data = { + "content": encoded_content, + "message": "Create new PIS File", + "branch": branch, + } + + create_file_response = requests.post(create_file_url, headers=self.header, data=json.dumps(file_data)) + + if create_file_response.status_code == 201: + print("File created successfully") + return True + else: + print("File creation failed.", create_file_response.status_code, create_file_response.text) + return False \ No newline at end of file diff --git a/src/main.py b/src/main.py index 34f6dc2..bf8ca1d 100644 --- a/src/main.py +++ b/src/main.py @@ -71,6 +71,8 @@ def main(): else: print(f"Found {len(pdf_files)} PDF files") + + # Find schedule card files schedule_cards = [] for pdf_file in pdf_files: schedule_cards.append(get_schedule_card_data(pdf_file)) @@ -116,11 +118,19 @@ def main(): auto_matched = validate.filter_timetable_entries(organised_for_processing) + validated = validate.check_and_validate_against_owlboard(auto_matched) + # print(trains) - out = open("output.txt", "w") + out = open("organised_for_processing.txt", "w") + out.write(json.dumps(organised_for_processing, indent=4, default=str)) + out.close() + out = open("auto_matched.txt", "w") out.write(json.dumps(auto_matched, indent=4, default=str)) out.close() + out = open("validated.txt", "w") + out.write(json.dumps(validated, indent=4, default=str)) + out.close() if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/train_detail.py b/src/train_detail.py index 50ecb77..6a2599e 100644 --- a/src/train_detail.py +++ b/src/train_detail.py @@ -1,6 +1,7 @@ from pyOwlBoard import client +import os -ob_client = client.OwlBoardClient('https://owlboard.info', 'x') +ob_client = client.OwlBoardClient('https://owlboard.info', os.getenv("OWLBOARD_KEY")) def find_gw_trains_by_headcode(headcode, date): train_list = ob_client.get_trains_by_headcode(headcode, date) @@ -38,6 +39,7 @@ def organise_trains(trains): def rationalise_timetable_entry(timetable_entries): rationalised_entries = [] for timetable_entry in timetable_entries: + if timetable_entry['stpIndicator'] == "C": continue entry = { 'stpIndicator': timetable_entry['stpIndicator'], 'operator': timetable_entry['operator'], diff --git a/src/validate.py b/src/validate.py index 071102d..1d4e5c9 100644 --- a/src/validate.py +++ b/src/validate.py @@ -1,3 +1,8 @@ +from pyOwlBoard import client +import os + +from gitea import GiteaConnector + ## Validates and filters input based on whether diagram time matches schedule start time def filter_timetable_entries(diagram_entries): for entry in diagram_entries: @@ -28,9 +33,45 @@ def filter_timetable_entries(diagram_entries): ## Checks OwlBoard API for existing PIS codes and whether they match def check_and_validate_against_owlboard(train_entries): - ### Loop through input list (which is output of above function, currently in output.txt) - ### check whether code exists in OwlBoard API, if so - does it match. - ### If exists but no match, open an issue. - ### If does not exist, do nothing. - ### If exists and does match, remove from input list. - ### Return output \ No newline at end of file + ob_client = client.OwlBoardClient("https://owlboard.info", os.getenv("OWLBOARD_KEY")) + gitea_client = GiteaConnector("https://git.fjla.uk/api/v1/repos/owlboard/data", "fred.boniface", os.getenv("GITEA_PASS")) + + output = [] + for train_entry in train_entries: + existing_pis = ob_client.get_stops_by_pis(train_entry['diagram_pis_code']) + if existing_pis: + database_stops = [stop.upper() for stop in existing_pis[0].get("stops", [])] + else: + database_stops = [] + + ## If only one possible entry, and it matches stops in Database, continue + if ( + len(train_entry['timetable_entries']) == 1 + and database_stops == train_entry['timetable_entries'][0]["stops"] + ): continue + + ## If database stops are empty (not in database) add to output list + elif not database_stops: + output.append(train_entry) + + # Else if only one possible entry (and previous statements false), open issue + elif len(train_entry['timetable_entries']) == 1: + issue_title = f"PIS Error | Code: {train_entry['diagram_pis_code']}" + issue_content = f""" + PIS Code {train_entry['diagram_pis_code']}. + Diagram dated {train_entry['diagram_date']} has been parsed and produced a mismatch with the OwlBoard database. + + Diagram Stops: {",".join(train_entry['timetable_entries'][0]["stops"])} + + Database Stops: {",".join(database_stops)} + + This requires a manual check of the PIS file to identify mistakes or changes. + """ + print(issue_content) + gitea_client.create_issue(issue_title, issue_content) + continue + + ## Else append to output + else: + output.append(train_entry) + return output