Changes:
- PIS Finder - DOCX Parser - LOCAL MODE (DEPRECATED ALREADY)
This commit is contained in:
parent
d3e24c8a03
commit
59f6439872
BIN
src/0222 THO SC CNDR.docx
Normal file
BIN
src/0222 THO SC CNDR.docx
Normal file
Binary file not shown.
@ -1,34 +0,0 @@
|
||||
### This uses the 'python-docx-2023' module
|
||||
from docx import Document
|
||||
|
||||
def extract_table(file_path):
|
||||
document = Document(file_path)
|
||||
|
||||
table = document.tables[4]
|
||||
print(document.tables[1])
|
||||
print(document.tables[2])
|
||||
print(document.tables[3])
|
||||
print(document.tables[4])
|
||||
print(document.tables[5])
|
||||
|
||||
data = []
|
||||
keys = None
|
||||
for i, row in enumerate(table.rows):
|
||||
text = (cell.text for cell in row.cells)
|
||||
if i == 0:
|
||||
keys = tuple(text)
|
||||
continue
|
||||
row_data = dict(zip(keys, text))
|
||||
data.append(row_data)
|
||||
|
||||
print(data)
|
||||
|
||||
if __name__ == "__main__":
|
||||
extract_table("./file.docx")
|
||||
|
||||
### This can parse each table. What needs to happen next
|
||||
### is to parse all tables, then check for a PIS code.
|
||||
### If PIS code exists, then find the associated headcode,
|
||||
### Then an API request can be made to OwlBoard to try
|
||||
### and find a service with valid stopping pattern,
|
||||
### then the PIS codes can be generated for review.
|
@ -1,87 +0,0 @@
|
||||
import os, sys, json, subprocess, re, yaml, requests
|
||||
|
||||
report_file_path = "./report.txt"
|
||||
code_store_list_url = "https://git.fjla.uk/OwlBoard/data/raw/branch/main/pis/gw.yaml"
|
||||
|
||||
def is_pdfgrep_installed():
|
||||
try:
|
||||
subprocess.check_call(["pdfgrep", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
def fetch_and_parse_yaml(url):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
existing_codes = yaml.safe_load(response.text)
|
||||
return existing_codes
|
||||
except Exception as e:
|
||||
print(f"Error downloading and parsing codes: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def main():
|
||||
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: python pdf_code_extraction.py <directory_path>")
|
||||
sys.exit(1)
|
||||
|
||||
pdf_directory = sys.argv[1]
|
||||
|
||||
if not os.path.isdir(pdf_directory):
|
||||
print(f"'{pdf_directory}' is not a valid directory.")
|
||||
sys.exit(1)
|
||||
|
||||
if not is_pdfgrep_installed():
|
||||
print("pdfgrep is not installed on your system.")
|
||||
sys.exit(1)
|
||||
|
||||
code_list = []
|
||||
|
||||
pdfgrep_cmd = f"pdfgrep -Prno 'code\\s*:\\s*\\d{{4}}' {pdf_directory}"
|
||||
pdfgrep_output = subprocess.getoutput(pdfgrep_cmd)
|
||||
|
||||
|
||||
for line in pdfgrep_output.splitlines():
|
||||
match = re.search(r"^(.*?):\s*code\s*:\s*(\d{4})", line)
|
||||
if match:
|
||||
filename, code = match.groups()
|
||||
code_list.append({"file":filename, "code":str(code)})
|
||||
|
||||
existing_codes = fetch_and_parse_yaml(code_store_list_url)['pis']
|
||||
existing_set = set()
|
||||
for item in existing_codes:
|
||||
code = item['code']
|
||||
existing_set.add(str(code))
|
||||
|
||||
unique_codes = set()
|
||||
unique_code_list = []
|
||||
missing_codes = []
|
||||
for item in code_list:
|
||||
code = item['code']
|
||||
if code not in unique_codes:
|
||||
unique_codes.add(code)
|
||||
unique_code_list.append(item)
|
||||
if code not in existing_set:
|
||||
missing_codes.append(item)
|
||||
|
||||
#print(missing_codes)
|
||||
|
||||
report = f"""
|
||||
Number of missing codes found: {len(missing_codes)}
|
||||
|
||||
Missing Codes:
|
||||
"""
|
||||
|
||||
for item in missing_codes:
|
||||
report += f"\n - code: {item['code']}\n stops: (File: {item['file']})"
|
||||
|
||||
|
||||
print(f"Saving report to {report_file_path}")
|
||||
with open(report_file_path, 'w') as report_file:
|
||||
report_file.write(report)
|
||||
|
||||
print(report)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
3
src/find_service.py
Normal file
3
src/find_service.py
Normal file
@ -0,0 +1,3 @@
|
||||
## Uses the HEADCODE to guess at the service the PIS code matches
|
||||
## Where there are multiple matches both are prepared and
|
||||
## await human review.
|
0
src/gitea_connector.py
Normal file
0
src/gitea_connector.py
Normal file
30
src/local_mode.py
Normal file
30
src/local_mode.py
Normal file
@ -0,0 +1,30 @@
|
||||
import parse_docx, pis_find
|
||||
|
||||
import os
|
||||
|
||||
def start():
|
||||
print("Local mode activated")
|
||||
working_directory = os.getcwd()
|
||||
print("Working directory: ", working_directory)
|
||||
|
||||
## Get all files in directory
|
||||
files = [f for f in os.listdir(working_directory) if os.path.isfile(os.path.join(working_directory, f))]
|
||||
docx_files = [f for f in files if f.endswith(".docx")]
|
||||
|
||||
results = []
|
||||
|
||||
if docx_files:
|
||||
print(f"Found {len(docx_files)} DOCX files in directory")
|
||||
for file in docx_files:
|
||||
print(file)
|
||||
items = parse_docx.extract_tables(file)
|
||||
results.extend(items)
|
||||
else:
|
||||
print("No DOCX files found")
|
||||
|
||||
print(f"Found {len(results)} PIS Codes in documents")
|
||||
pis_find.run(results)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
start()
|
0
src/mailbox_mode.py
Normal file
0
src/mailbox_mode.py
Normal file
16
src/main.py
Normal file
16
src/main.py
Normal file
@ -0,0 +1,16 @@
|
||||
import sys
|
||||
|
||||
def main():
|
||||
mode = sys.argv[1] if len(sys.argv) > 1 else "local"
|
||||
|
||||
if mode == "local":
|
||||
import local_mode
|
||||
local_mode.start()
|
||||
elif mode == "mailbox":
|
||||
print("MailBox mode not available yet")
|
||||
pass
|
||||
else:
|
||||
print("Invalid mode. Please specify 'local' or 'mailbox'")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
5
src/owlboard_connector.py
Normal file
5
src/owlboard_connector.py
Normal file
@ -0,0 +1,5 @@
|
||||
### API REQUESTS HERE
|
||||
|
||||
### AUTHENTICATION MUST BE COMPLETED, REGISTERING FOR THE API IF NECCESSARY
|
||||
### THIS NEGATES THE ABILITY TO USE LOCAL MODE - MAILBOX MODE ONLY AS
|
||||
### MAILBOX ACCESS IS NEEDED FOR REGISTRATION
|
72
src/parse_docx.py
Normal file
72
src/parse_docx.py
Normal file
@ -0,0 +1,72 @@
|
||||
### This uses the 'python-docx-2023' module
|
||||
from docx import Document
|
||||
import re
|
||||
|
||||
### This can parse each table. What needs to happen next
|
||||
### is to parse all tables, then check for a PIS code.
|
||||
### If PIS code exists, then find the associated headcode,
|
||||
### Then an API request can be made to OwlBoard to try
|
||||
### and find a service with valid stopping pattern,
|
||||
### then the PIS codes can be generated for review.
|
||||
|
||||
PIS_PATTERN = re.compile(r'PIS code\s*:\s*(\d{4})')
|
||||
HEADCODE_PATTERN = re.compile(r'(\d{1}[A-Z]\d{2})')
|
||||
|
||||
def extract_tables(file_path):
|
||||
document = Document(file_path)
|
||||
print(f"Reading {len(document.tables)} tables from {file_path}")
|
||||
|
||||
pis_info = []
|
||||
|
||||
for table in document.tables:
|
||||
data = []
|
||||
for i, row in enumerate(table.rows):
|
||||
text = (cell.text for cell in row.cells)
|
||||
if i == 0:
|
||||
keys = tuple(text)
|
||||
continue
|
||||
row_data = dict(zip(keys, text))
|
||||
data.append(row_data)
|
||||
pis_and_headcode = match_pis_and_headcode(data)
|
||||
if pis_and_headcode:
|
||||
pis_and_headcode['source_file'] = file_path
|
||||
pis_info.append(pis_and_headcode)
|
||||
|
||||
return(pis_info)
|
||||
|
||||
|
||||
def match_pis_and_headcode(table_data):
|
||||
pis_code = None
|
||||
headcode = None
|
||||
job_head = None
|
||||
|
||||
for item in table_data:
|
||||
for key, value in item.items():
|
||||
match = PIS_PATTERN.search(value)
|
||||
if match:
|
||||
pis_code = match.group(1)
|
||||
job_head = key.strip()
|
||||
break
|
||||
if pis_code:
|
||||
break
|
||||
|
||||
if pis_code:
|
||||
for item in table_data:
|
||||
for key in item:
|
||||
match = HEADCODE_PATTERN.search(key)
|
||||
if match:
|
||||
headcode = match.group()
|
||||
break
|
||||
if headcode:
|
||||
break
|
||||
|
||||
if pis_code and headcode:
|
||||
return {'job_head': job_head, 'headcode': headcode, 'pis': pis_code}
|
||||
else:
|
||||
return None
|
||||
|
||||
def solo_run():
|
||||
print(extract_tables("./file.docx"))
|
||||
|
||||
if __name__ == "__main__":
|
||||
solo_run()
|
33
src/pis_find.py
Normal file
33
src/pis_find.py
Normal file
@ -0,0 +1,33 @@
|
||||
import requests
|
||||
|
||||
def run(data_list):
|
||||
deduplicated_data = dedup(data_list)
|
||||
print(f"Removed {len(data_list) - len(deduplicated_data)} duplicate codes")
|
||||
print(f"Searching for {len(deduplicated_data)} PIS codes")
|
||||
missing_data = find_missing(deduplicated_data)
|
||||
print(f"{missing_data} missing PIS codes in OwlBoard data")
|
||||
|
||||
|
||||
def dedup(data_list):
|
||||
unique_dicts = {d['pis']: d for d in data_list}.values()
|
||||
unique_list_of_dicts = list(unique_dicts)
|
||||
return unique_list_of_dicts
|
||||
|
||||
|
||||
## AUTH REQUIRED!!!
|
||||
def find_missing(data_list):
|
||||
BASEURL = 'http://localhost:8460/api/v2/pis/byCode/'
|
||||
#BASEURL = 'https://owlboard.info/api/v2/pis/byCode/'
|
||||
missing_data = []
|
||||
|
||||
for item in data_list:
|
||||
pis_code = item.get('pis')
|
||||
if pis_code:
|
||||
url = BASEURL + pis_code
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
json_response = response.json()
|
||||
if json_response and isinstance(json_response, list):
|
||||
missing_data.append(item)
|
||||
else:
|
||||
print(f"Request failed for PIS {pis_code}. Status: {response.status_code}")
|
Reference in New Issue
Block a user