Coverage for src/pymmails/grabber/mailboximap.py: 18%
116 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-04 01:15 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-04 01:15 +0200
1"""
2@file
3@brief Defines a mailbox using IMAP
4"""
6import imaplib
7import re
8import email
9import email.message
10from pyquickhelper.loghelper import noLOG
11from .mail_exception import MailException
12from .email_message import EmailMessage
15class MailBoxImap:
17 """
18 Defines a mail box with :epkg:`IMAP` interface.
20 .. exref::
21 :title: Fetch mails from a gmail account
23 ::
25 user = "address no domain"
26 pwd = "password"
27 server = "imap.gmail.com"
29 box = MailBoxImap(user, pwd, server, ssl=True)
30 box.login()
32 # ... fetch emails
34 box.logout()
35 """
37 expFolderName = re.compile('\\"(.*?)\\"')
39 def __init__(self, user, pwd, server, ssl=False, fLOG=noLOG):
40 """
41 @param user user
42 @param pwd password
43 @param server server something like ``imap.domain.ext``
44 @param ssl select ``IMPA_SSL`` or ``IMAP``
45 @param fLOG logging function
47 For gmail, it is ``imap.gmail.com`` and ssl must be true.
48 """
49 self.M = imaplib.IMAP4_SSL(server) if ssl else imaplib.IMAP4(server)
50 self._user = user
51 self._password = pwd
52 self.fLOG = fLOG
54 def login(self):
55 """
56 login
57 """
58 self.M.login(self._user, self._password)
60 def logout(self):
61 """
62 logout
63 """
64 self.M.logout()
66 def folders(self):
67 """
68 Returns the list of folder of the mail box.
69 """
70 folders = self.M.list()
71 if folders[0] != "OK":
72 raise MailException(
73 "unable to retrieve the folder list for " +
74 self._user)
75 res = []
76 for f in folders[1]:
77 s = f.decode("utf8")
78 if r"\Noselect" in s:
79 continue
80 # s looks like this: (\HasNoChildren) "/" "INBOX/Something"
81 exp = MailBoxImap.expFolderName.findall(s)
82 name = exp[-1]
83 res.append(name)
84 return res
86 def enumerate_mails_in_folder(
87 self, folder, skip_function=None, date=None, pattern="ALL", body=True):
88 """
89 Enumerates all mails in folder folder.
91 @param folder folder name
92 @param skip_function if not None, use this function on the header/body to avoid loading the entire message (and skip it)
93 @param pattern search pattern (see below)
94 @param date add a date to the pattern
95 @param body add body
96 @return iterator on (message)
98 The search pattern can be used to look for a subset of email.
99 It follows these `specifications
100 <http://tools.ietf.org/html/rfc3501#page-49>`_.
101 If a folder is a subfolder, the syntax should be
102 ``folder/subfolder``.
104 .. exref::
105 :title: Search pattern
107 ::
109 pattern='FROM "xavier" SINCE 1-Feb-2013'
110 pattern='FROM "xavier" SINCE 1-Feb-2013 BEFORE 5-May-2013'
111 pattern='FROM "xavier" SINCE 1-Feb-2013 BEFORE 5-May-2013 (UNANSWERED)'
112 pattern='CC "jacques" FROM "xavier" (DELETED)'
113 pattern='TEXT "github"'
114 pattern='LARGER 10000 SMALLER 1000000'
115 pattern='SUBJECT "programmation"'
116 pattern='TO "student" (FLAGGED)'
117 pattern='(UNSEEN)'
119 If the function generates an error such as::
121 imaplib.error: command: SEARCH => got more than 10000 bytes
123 The keyword RECENT will be added to the search pattern
124 in order to retreive the newest mails.
125 """
126 if isinstance(folder, list):
127 for fold in folder:
128 iter = self.enumerate_mails_in_folder(folder=fold,
129 skip_function=skip_function, date=date, pattern=pattern, body=body)
130 for mail in iter:
131 yield mail
132 else:
133 qfold = self.M._quote(folder)
134 self.M.select(qfold, readonly=True)
136 if date is not None:
137 pdat = 'SINCE {0}'.format(date)
138 if pattern == "ALL":
139 pattern = pdat
140 else:
141 pattern += " " + pdat
143 try:
144 pattern.encode('ascii')
145 charset = None
146 except UnicodeEncodeError:
147 charset = 'UTF8'
148 pattern = pattern.encode('utf-8')
149 pattern = "".join(chr(b) for b in pattern)
151 try:
152 try:
153 _, data = self.M.search(charset, pattern)
154 except UnicodeEncodeError:
155 charset = None
156 pattern = pattern.encode(
157 'ascii', errors='ignore').decode("ascii")
158 _, data = self.M.search(None, pattern)
159 except Exception as e:
160 if "SEARCH => got more " in str(e):
161 if pattern == "ALL":
162 pattern = "RECENT"
163 else:
164 pattern += " RECENT"
165 pattern = pattern.strip()
166 self.fLOG("[MailBoxImap.enumerate_mails_in_folder] limit email "
167 "search for folder '{0}' to recent emails with "
168 "pattern '{1}'".format(folder, pattern))
169 data = self.M.search(charset, pattern)[1]
170 else:
171 raise MailException(
172 "Unable to search for pattern: '{0}' "
173 "(charset='{1}')\nin subfolder {2}\n"
174 "check the folder you search for is right."
175 .format(pattern, charset, qfold)) from e
177 spl = data[0].split()
178 self.fLOG("MailBoxImap.enumerate_mails_in_folder [folder={0} nbm={1} body={2} pattern={3}]".format(
179 folder, len(spl), body, pattern))
181 for num in spl:
182 if skip_function is not None:
183 data = self.M.fetch(num, '(BODY[HEADER])')[1]
184 emailBody = data[0][1]
185 mail = email.message_from_bytes(
186 emailBody, _class=EmailMessage)
187 if skip_function(mail):
188 continue
189 if body:
190 data = self.M.fetch(num, '(RFC822)')[1]
191 emailBody = data[0][1]
192 mail = email.message_from_bytes(
193 emailBody, _class=EmailMessage)
194 elif skip_function is None:
195 data = self.M.fetch(num, '(BODY[HEADER])')[1]
196 emailBody = data[0][1]
197 mail = email.message_from_bytes(
198 emailBody, _class=EmailMessage)
199 yield mail
201 self.M.close()
203 def enumerate_search_person(self, person, folder, skip_function=None,
204 date=None, max_dest=5, body=True):
205 """
206 Enumerates all mails in folder folder from a user
207 or sent to a user.
209 @param person person to look for or persons to look for
210 @param folder folder name
211 @param skip_function if not None, use this function on the header/body to avoid loading the entire message (and skip it)
212 @param pattern search pattern (see below)
213 @param max_dest maximum number of receivers
214 @param body get the body
215 @return iterator on (message)
217 If *person* is a list, the function iterates on the list of
218 persons to look for. It returns only unique mails.
219 """
220 if isinstance(person, list):
221 unique_id = set()
222 for p in person:
223 mail_set = self.enumerate_search_person(p, folder=folder,
224 skip_function=skip_function, date=date,
225 max_dest=max_dest, body=body)
226 for mail in mail_set:
227 uid = mail.UniqueID
228 if uid not in unique_id:
229 unique_id.add(uid)
230 yield mail
231 else:
232 pat1 = 'FROM "{0}"'.format(person)
233 if date is not None:
234 pat1 += ' SINCE {0}'.format(date)
235 for mail in self.enumerate_mails_in_folder(folder, skip_function=skip_function,
236 pattern=pat1, body=body):
237 yield mail
238 pat2 = 'TO "{0}"'.format(person)
239 if date is not None:
240 pat2 += ' SINCE {0}'.format(date)
241 for mail in self.enumerate_mails_in_folder(
242 folder, skip_function=skip_function, pattern=pat2):
243 if max_dest > 0:
244 tos = mail.get_to()
245 if tos:
246 ll = len(tos)
247 if ll <= max_dest:
248 yield mail
249 else:
250 yield mail
252 def enumerate_search_subject(self,
253 subject,
254 folder,
255 skip_function=None,
256 date=None,
257 max_dest=5):
258 """
259 Enumerates all mails in folder folder with a subject
260 verifying a regular expression.
262 @param subject subject to look for
263 @param folder folder name
264 @param skip_function if not None, use this function on the header/body to avoid loading the entire message (and skip it)
265 @param pattern search pattern (see below)
266 @param max_dest maximum number of receivers
267 @return iterator on (message)
268 """
269 pat1 = 'SUBJECT "{0}"'.format(subject)
270 for mail in self.enumerate_mails_in_folder(
271 folder, skip_function=skip_function, pattern=pat1):
272 yield mail