Compare commits

..

No commits in common. "Meating-it-out" and "main" have entirely different histories.

18 changed files with 136 additions and 590 deletions

9
.gitignore vendored
View File

@ -1,10 +1,3 @@
env_conf
include
*.docx
*.pdf
git
run.sh
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
@ -24,8 +17,6 @@ eggs/
.eggs/
lib/
lib64/
lib64
bin
parts/
sdist/
var/

View File

@ -4,7 +4,7 @@ This is an experimental project and is not yet used as part of the OwlBoard stac
## Language
It is so-far undecided what language will be used. Documents for parsing are likely to be a few hundred lines long so searching may become processor intensive meaning Go may be a good candidate, however Python offers an array of libraries which could be helpful.
It is so-far undecided what language will be used. Documents for parsing are likely to be a few hundred lines long so searching may become processor intensive meaning Go may be a good candidate, however Python offers an array of libraries which coule be helpful.
## File formats
@ -14,25 +14,24 @@ Diagrams are received in DOCX format, however can be easily be converted to ODT,
The aim of diagram-parser is to simplify the addition of PIS codes that are not yet in the OwlBoard data source. The planned implementation is as follows:
- diagram-parser is subscribed to an email inbox (IMAP/POP3)
- Formatted train-crew schedule cards are sent to the inbox and loaded by diagram-parser
- List of existing PIS codes is loaded and a list of non-existent codes is compiled (0000-9999)
- If a code is found both in the diagram and on the list of non-existent codes, a Gitea issue is opened providing details of the code.
- Once the program has run and extracted only the relavent details, the email is deleted and the file is closed and not stored.
- The evantual aim is to avoid any manual searching of the files.
- diagram-parser is subscribed to an email inbox (IMAP/POP3)
- Formatted train-crew schedule cards are sent to the inbox and loaded by diagram-parser
- List of existing PIS codes is loaded and a list of non-existent codes is compiled (0000-9999)
- If a code is found both in the diagram and on the list of non-existent codes, a Gitea issue is opened providing details of the code.
- Once the program has run and extracted only the relavent details, the email is deleted and the file is closed and not stored.
- The evantual aim is to avoid any manual searching of the files.
The current process of adding new codes involves being made aware of them face to face, or finding them myself and manually finding and adding them to the data source.
## Points to Remember
- Emails received should be verified.
- A pre-authorised key in the subject field, any emails not matching the key should be discarded.
- Attachment formats may vary slightly.
- The format of the attachment should be checked and any errors handled gracefully.
- Avoid duplicate issues
- Issues opened should contain the missing PIS code in their title, this application should check for any open issues containing the missing code to avoid duplicated issues.
- Emails received should be verified.
- A pre-authorised key in the subject field, any emails not matching the key should be discarded.
- Attachment formats may vary slightly.
- The format of the attachment should be checked and any errors handled gracefully.
- Avoid duplicate issues
- Issues opened should contain the missing PIS code in their title, this application should check for any open issues containing the missing code to avoid duplicated issues.
## Main external dependencies (Expected)
- imaplib
- email
- imaplib
- email

View File

@ -1,5 +0,0 @@
home = /usr/bin
include-system-site-packages = false
version = 3.12.3
executable = /usr/bin/python3.12
command = /usr/bin/python -m venv /home/fred.boniface/Desktop/diagrams-to-parse/diagram-parser

View File

@ -1,13 +0,0 @@
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
gitdb==4.0.11
GitPython==3.1.42
idna==3.6
lxml==5.1.0
pycparser==2.21
python-docx-2023==0.2.17
PyYAML==6.0.1
requests==2.31.0
smmap==5.0.1
urllib3==2.2.1

View File

@ -1,63 +0,0 @@
# Load configuration from file/env variables
import os
def load():
cfg = {}
toLoad = [
{
"envname": "DG_IMAP_HOST",
"filepath": "/owlboard/dgp/imap/host"
},
{
"envname": "DG_IMAP_PORT",
"filepath": "/owlboard/dgp/imap/port",
"default": "unk"
},
{
"envname": "DG_IMAP_USER",
"filepath": "/owlboard/dgp/imap/user",
},
{
"envname": "DG_IMAP_PASS",
"filepath": "/owlboard/dgp/imap/pass",
},
{
"envname": "DG_OWL_UUID",
"filepath": "/owlboard/dgp/api/uuid",
},
{
"envname": "DG_GITEA_KEY",
"filepath": "/owlboard/dgp/gitea/key"
},
{
"envname": "DG_GITEA_HOST",
"filepath": "/owlboard/dgp/gitea/host"
},
{
"envname": "DG_GITEA_SSHPORT",
"filepath": "/owlboard/dgp/gitea/sshport"
}
]
for item in toLoad:
filepath = item["filepath"]
envname = item["envname"]
default = item.get("default")
# Try to load value from file
try:
with open(filepath, "r") as file:
value = file.read().strip()
except FileNotFoundError:
# If file doesn't exist, try to get value from environment variable
value = os.environ.get(envname)
# If value is still not found, use the default if provided
if value is None and default is not None:
value = default
# Add the value to the cfg dictionary
cfg[envname] = value
return cfg

View File

@ -0,0 +1,34 @@
### This uses the 'python-docx-2023' module
from docx import Document
def extract_table(file_path):
document = Document(file_path)
table = document.tables[4]
print(document.tables[1])
print(document.tables[2])
print(document.tables[3])
print(document.tables[4])
print(document.tables[5])
data = []
keys = None
for i, row in enumerate(table.rows):
text = (cell.text for cell in row.cells)
if i == 0:
keys = tuple(text)
continue
row_data = dict(zip(keys, text))
data.append(row_data)
print(data)
if __name__ == "__main__":
extract_table("./file.docx")
### This can parse each table. What needs to happen next
### is to parse all tables, then check for a PIS code.
### If PIS code exists, then find the associated headcode,
### Then an API request can be made to OwlBoard to try
### and find a service with valid stopping pattern,
### then the PIS codes can be generated for review.

View File

@ -0,0 +1,87 @@
import os, sys, json, subprocess, re, yaml, requests
report_file_path = "./report.txt"
code_store_list_url = "https://git.fjla.uk/OwlBoard/data/raw/branch/main/pis/gw.yaml"
def is_pdfgrep_installed():
try:
subprocess.check_call(["pdfgrep", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError:
return False
def fetch_and_parse_yaml(url):
try:
response = requests.get(url)
response.raise_for_status()
existing_codes = yaml.safe_load(response.text)
return existing_codes
except Exception as e:
print(f"Error downloading and parsing codes: {e}")
sys.exit(1)
def main():
if len(sys.argv) != 2:
print("Usage: python pdf_code_extraction.py <directory_path>")
sys.exit(1)
pdf_directory = sys.argv[1]
if not os.path.isdir(pdf_directory):
print(f"'{pdf_directory}' is not a valid directory.")
sys.exit(1)
if not is_pdfgrep_installed():
print("pdfgrep is not installed on your system.")
sys.exit(1)
code_list = []
pdfgrep_cmd = f"pdfgrep -Prno 'code\\s*:\\s*\\d{{4}}' {pdf_directory}"
pdfgrep_output = subprocess.getoutput(pdfgrep_cmd)
for line in pdfgrep_output.splitlines():
match = re.search(r"^(.*?):\s*code\s*:\s*(\d{4})", line)
if match:
filename, code = match.groups()
code_list.append({"file":filename, "code":str(code)})
existing_codes = fetch_and_parse_yaml(code_store_list_url)['pis']
existing_set = set()
for item in existing_codes:
code = item['code']
existing_set.add(str(code))
unique_codes = set()
unique_code_list = []
missing_codes = []
for item in code_list:
code = item['code']
if code not in unique_codes:
unique_codes.add(code)
unique_code_list.append(item)
if code not in existing_set:
missing_codes.append(item)
#print(missing_codes)
report = f"""
Number of missing codes found: {len(missing_codes)}
Missing Codes:
"""
for item in missing_codes:
report += f"\n - code: {item['code']}\n stops: (File: {item['file']})"
print(f"Saving report to {report_file_path}")
with open(report_file_path, 'w') as report_file:
report_file.write(report)
print(report)
if __name__ == "__main__":
main()

View File

@ -1,3 +0,0 @@
## Uses the HEADCODE to guess at the service the PIS code matches
## Where there are multiple matches both are prepared and
## await human review.

View File

@ -1,34 +0,0 @@
import owlboard_connector
import sys
def humanYaml(pis_list):
additional_pis = ''
manual_review = ''
for pis in pis_list:
if len(pis['services']) == 1:
crs = []
try:
if (len(pis['services'][0]['stops']) > 0) :
for stop in pis['services'][0]['stops']:
crs.append(owlboard_connector.convert_tiploc_to_crs(stop))
additional_pis += f' - code: "{pis["pis"]}"\n'
additional_pis += f' #headcode: {pis["headcode"]}\n'
additional_pis += f' #date: {pis["date"]}\n'
additional_pis += f' #source_file: {pis["diagram_file"]}\n'
additional_pis += f' stops: [{",".join(crs)}]\n'
except Exception as err:
print(err)
elif len(pis['services']) > 1:
manual_review += f'## THIS CODE REQUIRES MANUAL VERIFICATION\n'
manual_review += f' - code: "{pis["pis"]}"\n'
manual_review += f' #headcode: {pis["headcode"]}\n'
manual_review += f' #date: {pis["date"]}\n'
manual_review += f' #source_file: {pis["diagram_file"]}\n'
for service in pis["services"]:
crs = []
if service and service['stops']:
for stop in service['stops']:
crs.append(owlboard_connector.convert_tiploc_to_crs(stop))
manual_review += f' stops: [{",".join(crs)}]\n'
return "FOR REVIEW\n" + additional_pis + manual_review

View File

@ -1,42 +0,0 @@
import requests, os, git
from datetime import datetime
BASE_URL = "https://git.fjla.uk/"
REPO_URL = f"{BASE_URL}owlboard/data"
REPO_PATH = "./git/clone/data"
USER = 'owlbot'
TOKEN = os.environ.get('DGP_GITEA_TOK')
HEADERS = {
'Content-Type': 'application/json',
'accept': 'application/json',
}
BRANCH_NAME = 'auto-' + datetime.now().strftime("%Y%m%d-%H%M%S")
FILE_NAME = 'dg_parser_' + datetime.now().strftime("%Y%m%d-%H%M%S")
'''
I need a way here to get the original file from the 'main' branch and
append the generated PIS codes. Then push to a new generated branch.
Then a pull request should be created but can probably be done with actions.
In reality this program should just take in DOCX files and spit out formatted
PIS data to the repo, everything else can be handled at the repo level??
None of this currently works...
'''
def clone_repository():
git.Repo.clone_from(REPO_URL, REPO_PATH)
def commit_and_push_changes(text_to_append, commit_message):
repo = git.Repo(REPO_PATH)
repo.git.checkout("-b", BRANCH_NAME)
with open(REPO_PATH + f"/pis/{FILE_NAME}.yaml", 'w') as file:
file.write(text_to_append)
repo.index.add([f"pis/{FILE_NAME}.yaml"])
repo.index.commit(commit_message)
origin = repo.remote(name='origin')
origin_url_credentials = REPO_URL.replace('https://', f'https://{USER}:{TOKEN}@')
origin.set_url(origin_url_credentials)
origin.push(refspec=BRANCH_NAME)

View File

@ -1,44 +0,0 @@
import imaplib, email, os
class IMAPConnector:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'imap_connection'):
IMAP_SERVER = os.environ.get('DGP_EML_HOST')
IMAP_USER = os.environ.get('DGP_EML_USER')
IMAP_PASS = os.environ.get('DGP_EML_PASS')
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASS]):
raise ValueError("Please ensure DGP_EML_HOST, DGP_EML_USER and DGP_EML_PASS are defined in the environment")
self.imap_connection = imaplib.IMAP4_SSL(IMAP_SERVER)
self.imap_connection.login(IMAP_USER, IMAP_PASS)
self.imap_connection.select('INBOX')
def fetch_filtered_emails(self, sender_email):
filtered_emails = []
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
if result == 'OK':
for num in data[0].split():
result, email_data = self.imap_connection.fetch(num, '(RFC822)')
if result == 'OK':
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
filtered_emails.append(email_message)
return filtered_emails
def delete_emails_from_sender(self, sender_email):
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
if result == 'OK':
for num in data[0].split():
self.imap_connection.store(num, '+FLAGS', '\\Deleted')
self.imap_connection.expunge()
print(f"All messages from {sender_email} deleted successfully")

View File

@ -1,76 +0,0 @@
import parse_docx, pis_find, owlboard_connector, formatter, gitea_connector
import os, sys
def start():
print("Running OwlBoard Diagram Parser in local mode")
if not owlboard_connector.check_connection():
print("Exiting")
sys.exit(1)
else:
print("OwlBoard connection successful")
working_directory = os.getcwd()
print("Working directory: ", working_directory)
## Get all files in directory
files = [f for f in os.listdir(working_directory) if os.path.isfile(os.path.join(working_directory, f))]
docx_files = [f for f in files if f.endswith(".docx")]
results = []
if docx_files:
print(f"Found {len(docx_files)} DOCX files in directory")
for file in docx_files:
print(file)
items = parse_docx.extract_tables(file)
results.extend(items)
else:
print("No DOCX files found")
print(f"Found {len(results)} PIS Codes in documents")
missing_pis = pis_find.run(results)
print(missing_pis)
get_detail = []
for code in missing_pis:
print(f"Fetching services with code: {code}")
services = owlboard_connector.get_services(code['headcode'], code['date'])
get_detail.append({
'pis': code['pis'],
'services': services,
'diagram_file': code['source_file'],
'date': code['date'],
'headcode': code['headcode'],
})
details = []
for item in get_detail:
detail = {
'pis': item['pis'],
'headcode': item['headcode'],
'date': item['date'],
'services': [],
'diagram_file': item['diagram_file']
}
for service in item['services']:
service_detail = owlboard_connector.get_service_detail(service['trainUid'], item['date'])
detail['services'].append(service_detail)
details.append(detail)
formatted_additions = formatter.humanYaml(details)
print(formatted_additions)
out = open("pis_output", "a")
f.write('\n---\n')
f.write(formatted_additions)
f.close()
gitea_connector.clone_repository()
gitea_connector.commit_and_push_changes(formatted_additions,"From owlbot diagram-parser")
if __name__ == "__main__":
print("To use local mode, please call `main.py local`")

View File

@ -1,2 +0,0 @@
import imaplib
import email

View File

@ -1,16 +0,0 @@
import sys
def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "local"
if mode == "local":
import local_mode
local_mode.start()
elif mode == "mailbox":
print("MailBox mode not available yet")
pass
else:
print("Invalid mode. Please specify 'local' or 'mailbox'")
if __name__ == "__main__":
main()

View File

@ -1,104 +0,0 @@
### API REQUESTS HERE
### AUTHENTICATION MUST BE COMPLETED, REGISTERING FOR THE API IF NECCESSARY
### THIS NEGATES THE ABILITY TO USE LOCAL MODE - MAILBOX MODE ONLY AS
### MAILBOX ACCESS IS NEEDED FOR REGISTRATION... DON'T BE A FUCKING IDIOT... I CAN JUST PASS
### A UUID IN TO THE PROGRAM!! TWAT.
import requests, os
OB_PIS_BASE_URL = "https://owlboard.info/api/v2/pis/byCode/"
OB_TRN_BASE_URL = "https://owlboard.info/api/v2/timetable/train/"
OB_TIP_BASE_URL = "https://owlboard.info/api/v2/ref/locationCode/tiploc/"
#OB_PIS_BASE_URL = "http://localhost:8460/api/v2/pis/byCode/"
#OB_TRN_BASE_URL = "http://localhost:8460/api/v2/timetable/train/"
#OB_TIP_BASE_URL = "http://localhost:8460/api/v2/ref/locationCode/tiploc/"
OB_TEST_URL = OB_PIS_BASE_URL + "5001"
UUID = os.environ.get('DGP_OB_UUID')
HEADERS = {
'user-agent': 'owlboard-diagram-parser',
'uuid': UUID
}
def check_connection():
if not UUID:
print("'DGP_OB_UUID' must be set in the environment")
return False
res = requests.get(OB_TEST_URL, headers=HEADERS, timeout=10)
if res.status_code == 401:
print("Error - Unauthorised. The UUID is not valid. STATUS: ", res.status_code, "UUID: ", UUID)
return False
elif res.status_code != 200:
print("Error - Unable to reach OwlBoard. STATUS: ", res.status_code)
return False
return True
def get_services(headcode, date):
print("Finding GWR service: ", headcode, ", ", date)
results = []
url = OB_TRN_BASE_URL + f"{date.strftime('%Y-%m-%d')}/headcode/{headcode.lower()}"
print(url)
try:
res = requests.get(url, headers=HEADERS)
if res.status_code == 200:
json_res = res.json()
for item in json_res:
if item['operator'] == 'GW':
results.append(item)
print(f"Found {len(results)} valid GWR Service")
return results
except Exception as e:
print(e)
sys.exit()
def get_service_detail(trainUid, date):
try:
print("Getting GWR service details: ", trainUid, ", ", date)
url = OB_TRN_BASE_URL + f"{date.isoformat()}/byTrainUid/{trainUid}"
print(url)
res = requests.get(url, headers=HEADERS)
if res.status_code == 200:
json_res = res.json()
if json_res:
svc_detail = {
'stops': json_res['stops'],
'vstp': json_res.get('vstp', False)
}
organised = organise_svc(svc_detail)
#print(res.text)
#print(organised)
print("Service Details Found")
return organised
else:
print("Service Not Found")
sys.exit()
except Exception as e:
print(e)
sys.exit()
def organise_svc(input):
stop_tiplocs = []
vstp = input['vstp']
for stop in input['stops']:
if stop['isPublic']:
stop_tiplocs.append(stop['tiploc'])
existingPis = False
if 'pis' in input and input['pis'].get('skipCount', 0) == 0:
existingPis = True
return {'stops': stop_tiplocs, 'vstp': vstp}
def convert_tiploc_to_crs(tiploc):
if tiploc == 'RDNG4AB':
return 'rdg'
res = requests.get(OB_TIP_BASE_URL + tiploc.upper(), headers=HEADERS)
if res.status_code == 200:
json_res = res.json()
if json_res:
crs = json_res[0]['3ALPHA']
return crs.lower()
else:
return "NO_CRS"

View File

@ -1,70 +0,0 @@
### This uses the 'python-docx-2023' module
from docx import Document
from datetime import datetime
import re
PIS_PATTERN = re.compile(r'PIS code\s*:\s*(\d{4})')
HEADCODE_PATTERN = re.compile(r'(\d{1}[A-Z]\d{2})')
def extract_tables(file_path):
document = Document(file_path)
print(f"Reading {len(document.tables)} tables from {file_path}")
pis_info = []
for table in document.tables:
data = []
for i, row in enumerate(table.rows):
text = (cell.text for cell in row.cells)
if i == 0:
keys = tuple(text)
continue
row_data = dict(zip(keys, text))
data.append(row_data)
pis_and_headcode = match_pis_and_headcode(data)
if pis_and_headcode:
pis_and_headcode['source_file'] = file_path
current_year = datetime.now().year
date_string_with_year = f"{current_year}{file_path.split()[0]}"
pis_and_headcode['date'] = datetime.strptime(date_string_with_year, "%Y%m%d")
pis_info.append(pis_and_headcode)
return(pis_info)
def match_pis_and_headcode(table_data):
pis_code = None
headcode = None
job_head = None
for item in table_data:
for key, value in item.items():
match = PIS_PATTERN.search(value)
if match:
pis_code = match.group(1)
job_head = key.strip()
break
if pis_code:
break
if pis_code:
for item in table_data:
for key in item:
match = HEADCODE_PATTERN.search(key)
if match:
headcode = match.group()
break
if headcode:
break
if pis_code and headcode:
return {'job_head': job_head, 'headcode': headcode, 'pis': pis_code}
else:
return None
def solo_run():
print(extract_tables("./file.docx"))
if __name__ == "__main__":
solo_run()

View File

@ -1,58 +0,0 @@
## This module downloads and compiles a list of all PIS codes across all branches of the OwlBoard/Data repo.
## The function load_existing_pis() is expected to be used outside of the module.
import os, requests, yaml
## TESTING
GIT_URL = 'https://git.fjla.uk'
GIT_API = GIT_URL + '/api/v1'
def load_existing_pis():
all_pis_data = []
branches = get_branch_list()
for branch in branches:
branch_pis_data = get_branch_pis(branch)
if branch_pis_data is not None:
all_pis_data.append(branch_pis_data)
print(f"Branch: {branch}, PIS Codes: {len(branch_pis_data['pis'])}")
# Merging data and removing duplicates based on 'code' key
merged_pis_data = {} ### THIS BIT DOESN'T COMPARE PROPERLY... PRINT EACH TYPE TO SEE STRUCTURE
for branch_data in all_pis_data:
for item in branch_data['pis']:
code = item['code']
# Only keep the first occurrence of each 'code'
if code not in merged_pis_data:
merged_pis_data[code] = item
# Convert the dictionary back to a list of dictionaries
merged_pis_list = [{'code': code, 'stops': value['stops']} for code, value in merged_pis_data.items()]
print(f"Total unique codes: {len(merged_pis_list)}")
return merged_pis_list
def get_branch_list():
get_branches_endpoint = GIT_API + '/repos/owlboard/data/branches'
res = requests.get(get_branches_endpoint)
branches_json = res.json()
branches = []
for repo in branches_json:
branches.append(repo['name'])
print(branches)
return branches
def get_branch_pis(branch_name):
get_file_url = GIT_API + f'/repos/owlboard/data/raw/%2Fpis%2Fgw.yaml?ref={branch_name}'
res = requests.get(get_file_url)
print(res.status_code)
pis_yaml = res.text
dic = yaml.safe_load(pis_yaml)
return dic
if __name__ == "__main__":
print(load_existing_pis())

View File

@ -1,35 +0,0 @@
## This modile compares discovered PIS codes with existing PIS codes obtained by calling pis_fetch
import pis_fetch
import sys
def run(data_list):
deduplicated_data = dedup(data_list)
print(f"Removed {len(data_list) - len(deduplicated_data)} duplicate codes")
print(f"Searching for {len(deduplicated_data)} PIS codes")
missing_data = find_missing(deduplicated_data)
print(f"{len(missing_data)} missing PIS codes in OwlBoard data")
return missing_data
def dedup(data_list):
unique_dicts = {d['pis']: d for d in data_list}.values()
unique_list_of_dicts = list(unique_dicts)
return unique_list_of_dicts
def find_missing(data_list):
existing_pis_list = pis_fetch.load_existing_pis()
missing_data = []
for item in data_list:
pis_code = item.get('pis')
if pis_code:
code_exists = False
for existing_pis in existing_pis_list:
if str(existing_pis['code']) == pis_code:
code_exists = True
break
if not code_exists:
missing_data.append(item)
return missing_data