diagram-parser/src/imap_connector.py

45 lines
1.7 KiB
Python

import imaplib, email, os
class IMAPConnector:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'imap_connection'):
IMAP_SERVER = os.environ.get('DGP_EML_HOST')
IMAP_USER = os.environ.get('DGP_EML_USER')
IMAP_PASS = os.environ.get('DGP_EML_PASS')
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASS]):
raise ValueError("Please ensure DGP_EML_HOST, DGP_EML_USER and DGP_EML_PASS are defined in the environment")
self.imap_connection = imaplib.IMAP4_SSL(IMAP_SERVER)
self.imap_connection.login(IMAP_USER, IMAP_PASS)
self.imap_connection.select('INBOX')
def fetch_filtered_emails(self, sender_email):
filtered_emails = []
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
if result == 'OK':
for num in data[0].split():
result, email_data = self.imap_connection.fetch(num, '(RFC822)')
if result == 'OK':
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
filtered_emails.append(email_message)
return filtered_emails
def delete_emails_from_sender(self, sender_email):
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
if result == 'OK':
for num in data[0].split():
self.imap_connection.store(num, '+FLAGS', '\\Deleted')
self.imap_connection.expunge()
print(f"All messages from {sender_email} deleted successfully")