Coverage for src/pyrsslocal/rss/rss_stream.py: 74%

132 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-02-02 02:59 +0100

1""" 

2@file 

3@brief Description of a RSS stream. 

4""" 

5import datetime 

6 

7from ..xmlhelper.xmlfilewalk import xml_filter_iterator 

8from .rss_blogpost import BlogPost 

9from ..helper.download_helper import get_url_content_timeout 

10 

11 

12class StreamRSS: 

13 

14 """ 

15 Requires :epkg:`feedparser`. 

16 Description of an :epkg:`RSS` stream. 

17 

18 :: 

19 

20 <outline text="Freakonometrics" title="Freakonometrics" 

21 type="rss" 

22 xmlUrl="http://freakonometrics.hypotheses.org/feed" 

23 htmlUrl="http://freakonometrics.hypotheses.org" /> 

24 

25 @var titleb title of the stream 

26 @var type type 

27 @var xmlUrl url of the rss stream 

28 @var htmlUrl main page of the blog 

29 @var keywordsb list of keywords 

30 """ 

31 

32 def __init__(self, titleb, type, xmlUrl, htmlUrl, # pylint: disable=W0622 

33 keywordsb, id=-1, nb=None): # pylint: disable=W0622 

34 """ 

35 @param titleb title of the stream 

36 @param type type 

37 @param xmlUrl url of the rss stream 

38 @param htmlUrl main page of the blog 

39 @param keywordsb keywords 

40 @param id an id 

41 @param nb not included in the database, part of the statistics with can be added if they not None 

42 """ 

43 self.titleb = titleb 

44 self.type = type 

45 self.xmlUrl = xmlUrl 

46 self.htmlUrl = htmlUrl 

47 self.keywordsb = keywordsb 

48 self.id = id 

49 self.stat = {} 

50 if nb is not None: 

51 self.stat["nb"] = nb 

52 

53 def __str__(self): 

54 """ 

55 usual 

56 """ 

57 return "%s: %s (%s)" % (self.type, self.titleb, self.xmlUrl) 

58 

59 def __lt__(self, o): 

60 """ 

61 cmp operator 

62 """ 

63 s1 = self.__str__().lower() 

64 s2 = self.__str__().lower() 

65 return s1 < s2 

66 

67 @property 

68 def index(self): 

69 """ 

70 Defines the column to use as an index. 

71 """ 

72 return "xmlUrl" 

73 

74 @property 

75 def asdict(self): 

76 """ 

77 Returns all members as a dictionary. 

78 

79 @return dictionary 

80 """ 

81 return {"titleb": self.titleb, 

82 "type": self.type, 

83 "xmlUrl": self.xmlUrl, 

84 "htmlUrl": self.htmlUrl, 

85 "keywordsb": self.keywordsb} 

86 

87 @staticmethod 

88 def schema_database_read(): 

89 """ 

90 Returns all members names and types as a dictionary. 

91 

92 @return dictionary 

93 """ 

94 return {0: ("titleb", str), 

95 1: ("type", str), 

96 2: ("xmlUrl", str), 

97 3: ("htmlUrl", str), 

98 4: ("keywordsb", str), 

99 5: ("id", int, "PRIMARYKEY", "AUTOINCREMENT")} 

100 

101 @property 

102 def schema_database(self): 

103 """ 

104 Returns all members names and types as a dictionary. 

105 

106 @return dictionary 

107 """ 

108 return {0: ("titleb", str), 

109 1: ("type", str), 

110 2: ("xmlUrl", str), 

111 3: ("htmlUrl", str), 

112 4: ("keywordsb", str), 

113 -1: ("id", int, "PRIMARYKEY", "AUTOINCREMENT")} 

114 

115 @property 

116 def asrow(self): 

117 """ 

118 Returns all the values as a row 

119 (following the schema given by @see me schema_database). 

120 

121 @return list of values 

122 """ 

123 return [self.titleb, 

124 self.type, 

125 self.xmlUrl, 

126 self.htmlUrl, 

127 ",".join(self.keywordsb)] 

128 

129 @staticmethod 

130 def enumerate_stream_from_google_list(file, encoding="utf8", fLOG=None): 

131 """ 

132 Retrieves the list of :epkg:`RSS` streams from 

133 a dump made with Google Reader. 

134 

135 @param file filename 

136 @param encoding encoding 

137 @param fLOG logging function 

138 @return list of @see cl StreamRSS 

139 

140 The format is the following: 

141 

142 .. exref:: 

143 :title: An entry in the XML config file 

144 

145 :: 

146 

147 <outline text="Freakonometrics" 

148 title="Freakonometrics" 

149 type="rss" 

150 xmlUrl="http://freakonometrics.hypotheses.org/feed" 

151 htmlUrl="http://freakonometrics.hypotheses.org" /> 

152 """ 

153 with open(file, "r", encoding=encoding) as ff: 

154 for o in xml_filter_iterator(ff, lambda f: True, log=True, 

155 xmlformat=False, fLOG=fLOG): 

156 for oo in o.enumerate_on_tag("outline", recursive=True): 

157 if isinstance(oo, tuple): # pylint: disable=R1720 

158 raise ValueError("wrong format file: " + file) 

159 else: 

160 if len(oo.other) == 0 and "xmlUrl" in oo: 

161 if len(oo["xmlUrl"]) > 0: 

162 obj = StreamRSS(titleb=oo["title"], 

163 type=oo["type"], 

164 xmlUrl=oo["xmlUrl"], 

165 htmlUrl=oo["htmlUrl"], 

166 keywordsb=[]) 

167 yield obj 

168 

169 @staticmethod 

170 def fill_table(db, tablename, iterator_on): 

171 """ 

172 Fills a table of a database, if the table does not exists, it creates it. 

173 

174 @param db database object (@see cl Database) 

175 @param tablename name of a table (created if it does not exists) 

176 @param iterator_on iterator_on on StreamRSS object 

177 

178 Example: 

179 

180 :: 

181 

182 res = list(StreamRSS.enumerate_stream_from_google_list(file)) 

183 StreamRSS.fill_table(db, "blogs", res) 

184 """ 

185 db.fill_table_with_objects( 

186 tablename, 

187 iterator_on, 

188 check_existence=True) 

189 

190 def enumerate_post(self, path=None, fLOG=None): 

191 """ 

192 Parses a :epkg:`RSS` stream. 

193 

194 @param path if None, use self.xmlUrl, otherwise, 

195 uses this path (url or local file) 

196 @param fLOG logging function 

197 @return list of @see cl BlogPost 

198 

199 We expect the format to be: 

200 

201 :: 

202 

203 {'summary_detail': 

204 {'base': '', 

205 'value': '<p> J\'ai encore perdu des ... </p>', 

206 'language': None, 

207 'type': 'text/html'}, 

208 'title_detail': 

209 {'base': '', 

210 'value': 'Installer pip pour Python', 

211 'language': None, 

212 'type': 'text/plain'}, 

213 'published': '2013-06-24 00:00:00', 

214 'published_parsed': time.struct_time(tm_year=2013, tm_mon=6, tm_mday=24, 

215 tm_hour=0, tm_min=0, tm_sec=0, 

216 tm_wday=0, tm_yday=175, tm_isdst=0), 

217 'link': 'http://www.xavierdupre.fr/blog/xd_blog.html?date=2013-06-24', 

218 'summary': '<p> J\'ai encore perdu de... </p>', 

219 'guidislink': False, 

220 'title': 'Installer pip pour Python', 

221 'links': [{'href': 'http://www.xavierdupre.fr/blog/xd_blog.html?date=2013-06-24', 

222 'rel': 'alternate', 'type': 'text/html'}], 

223 'id': 'http://www.xavierdupre.fr/blog/xd_blog.html?date=2013-06-24'} 

224 

225 If there is no date, the function will give the date of today 

226 (assuming you fetch posts from this blog everyday). 

227 If the id is not present, the guid will be the url, 

228 otherwise, it will be the id. 

229 """ 

230 import feedparser # pylint: disable=C0415 

231 if path is None: 

232 path = self.xmlUrl 

233 

234 if path.startswith("http://") or path.startswith("https://"): 

235 cont = get_url_content_timeout(path) 

236 if cont is None: 

237 if fLOG: 

238 fLOG( 

239 "[enumerate_post] unable to retrieve content for url: '{}'.".format(path)) 

240 else: 

241 cont = path 

242 

243 if cont is not None: 

244 

245 if "<title>" not in cont: 

246 if fLOG: 

247 fLOG("unable to parse content from " + self.xmlUrl) 

248 

249 try: 

250 d = feedparser.parse(cont) 

251 except RuntimeError: 

252 if fLOG: 

253 fLOG("[enumerate_post] cannot enumerate post in " 

254 "'{}'.".format(path)) 

255 d = None 

256 else: 

257 d = None 

258 

259 if d is not None: 

260 if len(d["entries"]) == 0: 

261 if fLOG: 

262 fLOG("[enumerate_post] no post for ", path) 

263 

264 for post in d["entries"]: 

265 titleb = post.get("title", "-") 

266 url = post.get("link", "") 

267 

268 try: 

269 id_ = post["id"] 

270 guid = url if post["guidislink"] else id_ 

271 except KeyError: 

272 id_ = url 

273 guid = url 

274 

275 try: 

276 desc = post["summary_detail"]["value"] 

277 except KeyError: 

278 try: 

279 desc = post["summary"] 

280 except KeyError: 

281 desc = "" 

282 

283 isPermaLink = True 

284 

285 try: 

286 structTime = post["published_parsed"] 

287 date = datetime.datetime(*structTime[:6]) 

288 except KeyError: 

289 try: 

290 structTime = post["updated_parsed"] 

291 date = datetime.datetime(*structTime[:6]) 

292 except KeyError: 

293 date = datetime.datetime.now() 

294 except TypeError as e: 

295 structTime = post["published_parsed"] 

296 if structTime is None: 

297 date = datetime.datetime.now() 

298 else: 

299 raise e 

300 

301 if date > datetime.datetime.now(): 

302 date = datetime.datetime.now() 

303 

304 bl = BlogPost(self, titleb, guid, isPermaLink, url, desc, date) 

305 yield bl 

306 

307 @staticmethod 

308 def enumerate_post_from_rsslist(list_rss_stream, fLOG=None): 

309 """ 

310 Enumerates all posts found in all rss_streams given as a list. 

311 

312 @param list_rss_stream list of rss streams 

313 @param fLOG logging function 

314 @return enumeration of blog post 

315 """ 

316 for rss in list_rss_stream: 

317 try: 

318 if fLOG: 

319 fLOG("reading post from", rss) 

320 except UnicodeEncodeError: 

321 if fLOG: 

322 fLOG("reading post from", [rss], "encoding issue") 

323 for post in rss.enumerate_post(): 

324 yield post 

325 

326 @property 

327 def stat_nb(self): 

328 """ 

329 Returns the statistics nb: ``self.stat.get("nb", 0)``. 

330 @return number 

331 """ 

332 return self.stat.get("nb", 0) 

333 

334 templates = {"default": """ 

335 <p class="%s"><a href="%s" onmousedown="sendlog('blog/{0.id}/in')">{0.titleb}</a> 

336 <a href="{0.htmlUrl}" target="_blank" onmousedown="sendlog('blog/{0.id}/outimg')"> 

337 <img src="/arrowi.png" width="12px" /></a></p> 

338 """.replace(" ", ""), 

339 "default_stat": """ 

340 <tr class="%s"><td> 

341 <a href="%s" onmousedown="sendlog('blog/{0.id}/in')">{0.titleb}</a> 

342 <a href="{0.htmlUrl}" target="_blank" onmousedown="sendlog('blog/{0.id}/outimg')"> 

343 <img src="/arrowi.png" width="12px" /></a> 

344 </td><td>{0.stat_nb}</td></tr> 

345 """.replace(" ", ""), 

346 } 

347 

348 def html(self, template=None, 

349 action="{0.htmlUrl}", 

350 style="blogtitle", 

351 addlog=True): 

352 """ 

353 Displays the blogs in HTML format, the template contains 

354 two kinds of informations: 

355 - ``{0.member}``: this string will be replaced by the member 

356 

357 @param template html template, if not None, it can equal to 

358 another default template: 

359 - default 

360 - default_stat 

361 @param action url to use when clicking on a blog 

362 @param style style of the paragraph containing the url 

363 @param addlog if True, url will be prefix by ``/logs/click/`` 

364 in order to be logged 

365 @return html string 

366 

367 If the template is None, it will be replaced a default value 

368 (see the code and the variable ``template``). 

369 """ 

370 if template is None: 

371 template = StreamRSS.templates["default"] % (style, action) 

372 else: 

373 template = StreamRSS.templates.get( 

374 template, 

375 template) % (style, 

376 action) 

377 

378 template = template.replace("__id__", str(self.id)) 

379 res = template.format(self) 

380 return res