Add IMAP Connector

This commit is contained in:
Fred Boniface 2024-02-16 22:01:27 +00:00
parent de482074e6
commit 82f885466e
3 changed files with 52 additions and 1 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
env_conf
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

44
src/imap_connector.py Normal file
View File

@ -0,0 +1,44 @@
import imaplib, email, os
class IMAPConnector:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'imap_connection'):
IMAP_SERVER = os.environ.get('DGP_EML_HOST')
IMAP_USER = os.environ.get('DGP_EML_USER')
IMAP_PASS = os.environ.get('DGP_EML_PASS')
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASS]):
raise ValueError("Please ensure DGP_EML_HOST, DGP_EML_USER and DGP_EML_PASS are defined in the environment")
self.imap_connection = imaplib.IMAP4_SSL(IMAP_SERVER)
self.imap_connection.login(IMAP_USER, IMAP_PASS)
self.imap_connection.select('INBOX')
def fetch_filtered_emails(self, sender_email):
filtered_emails = []
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
if result == 'OK':
for num in data[0].split():
result, email_data = self.imap_connection.fetch(num, '(RFC822)')
if result == 'OK':
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
filtered_emails.append(email_message)
return filtered_emails
def delete_emails_from_sender(self, sender_email):
result, data = self.imap_connection.search(None, f'(FROM "{sender_email}")')
if result == 'OK':
for num in data[0].split():
self.imap_connection.store(num, '+FLAGS', '\\Deleted')
self.imap_connection.expunge()
print(f"All messages from {sender_email} deleted successfully")

View File

@ -2,4 +2,9 @@
### AUTHENTICATION MUST BE COMPLETED, REGISTERING FOR THE API IF NECCESSARY ### AUTHENTICATION MUST BE COMPLETED, REGISTERING FOR THE API IF NECCESSARY
### THIS NEGATES THE ABILITY TO USE LOCAL MODE - MAILBOX MODE ONLY AS ### THIS NEGATES THE ABILITY TO USE LOCAL MODE - MAILBOX MODE ONLY AS
### MAILBOX ACCESS IS NEEDED FOR REGISTRATION ### MAILBOX ACCESS IS NEEDED FOR REGISTRATION
from imap_connector import IMAPConnector
imap_connector = IMAPConnector()
imap_connector.fetch_filtered_emails("no-reply@owlboard.info")