Coverage for src/pyrsslocal/rss/rss_database.py: 78%

185 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-02-02 02:59 +0100

1""" 

2@file 

3@brief Description of a :epkg:`RSS` stream. 

4""" 

5import datetime 

6import os 

7 

8from pyquickhelper.loghelper import fLOG 

9from pyensae.sql.database_main import Database 

10from .rss_blogpost import BlogPost 

11from .rss_stream import StreamRSS 

12 

13 

14class DatabaseRSS (Database): 

15 

16 """ 

17 Database specific to :epkg:`RSS`. 

18 """ 

19 

20 @staticmethod 

21 def schema_table(table): 

22 """ 

23 returns the schema for a specific table 

24 

25 @param table name (in ["stats", "event"]) 

26 @return dictionary 

27 """ 

28 if table == "stats": 

29 return {0: ("id_post", int), 

30 1: ("dtime", datetime.datetime), 

31 2: ("status", str), 

32 3: ("rate", int), 

33 4: ("comment", str), 

34 } 

35 elif table == "event": 

36 return {-1: ("id_event", int, "PRIMARYKEY", "AUTOINCREMENT"), 

37 0: ("dtime", datetime.datetime), 

38 1: ("uuid", str), 

39 2: ("type1", str), 

40 3: ("type2", str), 

41 4: ("args", str), 

42 } 

43 else: 

44 raise Exception("unexpected table name") 

45 

46 def __init__(self, dbfile, 

47 table_blogs="blogs", 

48 table_posts="posts", 

49 table_stats="posts_stat", 

50 table_event="events", 

51 LOG=fLOG): 

52 """ 

53 @param dbfile file database 

54 @param table_blogs table name for the blogs 

55 @param table_posts table name for the posts 

56 @param table_stats table name for the posts stats 

57 @param table_event table name for the events 

58 @param LOG logging function 

59 """ 

60 if not os.path.exists(dbfile): 

61 raise FileNotFoundError(dbfile) 

62 Database.__init__(self, dbfile, LOG=LOG) 

63 self.dbfile = dbfile 

64 self.table_blogs = table_blogs 

65 self.table_posts = table_posts 

66 self.table_stats = table_stats 

67 self.table_event = table_event 

68 self.connect() 

69 for tbl in [table_blogs, table_posts]: 

70 if not self.has_table(tbl): 

71 raise Exception("table %s not found in %s" % (tbl, dbfile)) 

72 

73 self.create_missing_table() 

74 self.close() 

75 

76 def create_missing_table(self): 

77 """ 

78 Creates the missing tables. 

79 """ 

80 

81 if self.has_table(self.table_stats) and len(self.get_table_columns( 

82 self.table_stats)) != len(DatabaseRSS.schema_table("stats")): 

83 self.remove_table(self.table_stats) 

84 

85 if not self.has_table(self.table_stats): 

86 schema = DatabaseRSS.schema_table("stats") 

87 self.create_table(self.table_stats, schema) 

88 self.commit() 

89 self.create_index( 

90 "id_post_" + 

91 self.table_stats, 

92 self.table_stats, 

93 "id_post", 

94 False) 

95 self.commit() 

96 

97 if not self.has_table(self.table_event): 

98 schema = DatabaseRSS.schema_table("event") 

99 self.create_table(self.table_event, schema) 

100 self.commit() 

101 

102 def __str__(self): 

103 """ 

104 usual 

105 """ 

106 return "file:%s, t-blogs:%s, t-posts:%s" % ( 

107 self.dbfile, self.table_blogs, self.table_posts) 

108 

109 specific_search = { 

110 "today": "SELECT DISTINCT id_rss FROM {0} WHERE pubDate >= '{1}'", 

111 "twoday": "SELECT DISTINCT id_rss FROM {0} WHERE pubDate >= '{2}'", 

112 "week": "SELECT DISTINCT id_rss FROM {0} WHERE pubDate >= '{3}'", 

113 "frequent": """SELECT id_rss FROM ( 

114 SELECT id_rss, SUM(nb)*1.0/ (MAX(day) - MIN(day)+1) AS avg_nb FROM ( 

115 SELECT id_rss, day, COUNT(*) AS nb FROM ( 

116 SELECT id_rss, getdayn(pubDate) AS day FROM {0} WHERE pubDate >= '{4}' 

117 ) GROUP BY id_rss, day 

118 ) GROUP BY id_rss 

119 ) WHERE avg_nb >= {5}""", 

120 "notfrequent": """SELECT id_rss FROM ( 

121 SELECT id_rss, SUM(nb)*1.0/ (MAX(day) - MIN(day)+1) AS avg_nb FROM ( 

122 SELECT id_rss, day, COUNT(*) AS nb FROM ( 

123 SELECT id_rss, getdayn(pubDate) AS day FROM {0} WHERE pubDate >= '{4}' 

124 ) GROUP BY id_rss, day 

125 ) GROUP BY id_rss 

126 ) WHERE avg_nb < {5}""", 

127 } 

128 

129 @staticmethod 

130 def getday(dt): 

131 """ 

132 Returns the same datetime but with no time. 

133 

134 @param dt datetime 

135 @return datetime which correspond to the beginning of the day 

136 """ 

137 if isinstance(dt, str): 

138 res = dt.split(" ") 

139 return res[0] 

140 else: 

141 res = datetime.datetime(dt.year, dt.month, dt.day) 

142 return res 

143 

144 @staticmethod 

145 def getdayn(dt): 

146 """ 

147 Returns the same datetime but with no time. 

148 

149 @param dt datetime 

150 @return datetime which correspond to the beginning of the day 

151 """ 

152 if isinstance(dt, str): 

153 dt = dt.split()[0] 

154 ymd = dt.split("-") 

155 res = datetime.datetime(int(ymd[0]), int(ymd[1]), int(ymd[2])) 

156 else: 

157 res = datetime.datetime(dt.year, dt.month, dt.day) 

158 one = datetime.datetime(2000, 1, 1) 

159 d = res - one 

160 return d.days 

161 

162 def enumerate_blogs(self, sorted_=True, specific=None, daily_freq=1.5, 

163 now=None, addstat=False): 

164 """ 

165 Enumerates all the blogs from the database. 

166 

167 @param sorted_ sorted by title 

168 @param specific specific search 

169 - None: all blogs 

170 - today: get all blogs for today 

171 - twoday: get all blogs for today and yesterday 

172 - week: get all blogs for last week 

173 - notfrequent: get all blogs publishing less posts in a day than ``daily_freq`` 

174 - frequent: get all blogs publishing more posts in a day than ``daily_freq`` 

175 @param daily_freq see parameter specific 

176 @param now if None, today means today, if not None, ``now`` will have the meaning of today 

177 @param addstat if True, the function will a field corresponding to the number of posts from this blog 

178 @return enumeration of @see cl StreamRSS 

179 """ 

180 if addstat: 

181 sqlstatjoinA = "SELECT A.*, nbpost FROM (" 

182 sqlstatjoinB = """) AS A INNER JOIN (SELECT id_rss, COUNT(*) AS nbpost FROM {0} 

183 GROUP BY id_rss) ON id_rss == A.id""".format(self.table_posts) 

184 orderby = "nbpost DESC" 

185 else: 

186 sqlstatjoinA = "" 

187 sqlstatjoinB = "" 

188 orderby = "titleb" 

189 

190 if isinstance(specific, list): 

191 if len(specific) == 1: 

192 specific = specific[0] 

193 else: 

194 raise TypeError( 

195 "unable to process if specific is a list:" + 

196 str(specific)) 

197 

198 if specific in [None, ""]: 

199 self.connect() 

200 sql = "%sSELECT titleb, type, xmlUrl, htmlUrl, keywordsb, id FROM %s%s" % ( 

201 sqlstatjoinA, self.table_blogs, sqlstatjoinB) 

202 if sorted_: 

203 sql += " ORDER BY " + orderby 

204 for row in self.execute(sql): 

205 bl = StreamRSS(*row) 

206 yield bl 

207 self.close() 

208 

209 elif specific in DatabaseRSS.specific_search.keys(): # pylint: disable=C0201 

210 

211 today = datetime.datetime.now() if now is None else now 

212 day = datetime.datetime(2013, 1, 2) - datetime.datetime(2013, 1, 1) 

213 yesday = today - day 

214 yes2 = yesday - day 

215 yesweek = today - (day * 7) 

216 yeshalf = today - (day * 180) 

217 self.connect() 

218 self.add_function("getdayn", 1, DatabaseRSS.getdayn) 

219 

220 sql = "%sSELECT titleb, type, xmlUrl, htmlUrl, keywordsb, id FROM %s WHERE id IN (%s)%s" % \ 

221 (sqlstatjoinA, self.table_blogs, 

222 DatabaseRSS.specific_search[specific].format( 

223 self.table_posts, 

224 yesday, 

225 yes2, 

226 yesweek, 

227 yeshalf, 

228 daily_freq), 

229 sqlstatjoinB) 

230 if sorted_: 

231 sql += " ORDER BY " + orderby 

232 

233 for row in self.execute(sql): 

234 bl = StreamRSS(*row) 

235 yield bl 

236 self.close() 

237 else: 

238 raise ValueError( 

239 "unable to interpret value %s for parameter specific" % 

240 specific) 

241 

242 def enumerate_latest_status(self, postid, nb=1, connect=True): 

243 """ 

244 Retrieves the latest status for a post. 

245 

246 @param postid post id 

247 @param nb number of desired status 

248 @param connect connect (True) or skip connection (False) 

249 @return enumerate on values from ``table_stats`` ordered by decreasing time 

250 """ 

251 if connect: 

252 self.connect() 

253 sch = DatabaseRSS.schema_table("stats") 

254 sql = "SELECT * FROM {0} WHERE id_post=={1} ORDER BY dtime DESC".format( 

255 self.table_stats, 

256 postid) 

257 for row in self.execute(sql): 

258 nb -= 1 

259 if nb < 0: 

260 break 

261 yield {sch[i][0]: row[i] for i in range(len(row))} 

262 if connect: 

263 self.close() 

264 

265 def private_process_condition(self, blog_selection=None, post_selection=None, 

266 sorted_=True, specific=None, now=None, 

267 searchterm=None): 

268 """ 

269 Returns a :epkg:`SQL` query corresponding to list of posts. 

270 

271 @param blog_selection list of blogs to consider (or empty for all) 

272 @param post_selection list of posts to consider 

273 @param sorted_ sorted by date 

274 @param specific specific search 

275 - None: all posts 

276 - today: get all posts for today 

277 - week: get all posts for last week 

278 @param searchterm if not None, filters using a SQL like search (using ``%``) 

279 @param now if None, today means today, if not None, ``now`` will have the meaning of today 

280 @return SQL query 

281 """ 

282 if blog_selection is None: 

283 blog_selection = [] 

284 if post_selection is None: 

285 post_selection = [] 

286 if searchterm is not None: 

287 if not searchterm.startswith("+") and "%" not in searchterm: 

288 searchterm = "%{0}%".format(searchterm) 

289 searchterm = searchterm.replace("'", "\\'").replace('"', '\\"') 

290 where = "WHERE UPPER(title) LIKE '{0}'".format(searchterm.upper()) 

291 else: 

292 where = "" 

293 

294 sql = """SELECT id_rss, title, guid, isPermaLink, link, description, pubDate, keywords, {0}.id AS id, 

295 titleb, type, xmlUrl, htmlUrl, keywordsb, {1}.id AS idblog 

296 FROM {0} 

297 INNER JOIN {1} 

298 ON {0}.id_rss == {1}.id 

299 {2} 

300 """.format(self.table_posts, self.table_blogs, where) 

301 

302 cond = [] 

303 if len(blog_selection) > 0: 

304 condition = ",".join(map(str, blog_selection)) 

305 cond.append(" id_rss in (%s)" % condition) 

306 if len(post_selection) > 0: 

307 condition = ",".join(map(str, post_selection)) 

308 cond.append("%s.id in (%s)" % (self.table_posts, condition)) 

309 if specific in ["today", "week", "twoday"]: 

310 today = datetime.datetime.now() if now is None else now 

311 day = datetime.datetime(2013, 1, 2) - datetime.datetime(2013, 1, 1) 

312 dec = {"week": 7, "today": 1, "twoday": 2}.get(specific, 7) 

313 mdat = today - day * dec 

314 st = "pubDate >= '{0}'".format(mdat) 

315 cond.append(st) 

316 

317 if len(cond) > 0: 

318 sql += " WHERE " + " AND ".join(cond) 

319 

320 if sorted_: 

321 sql += " ORDER BY pubDate DESC" 

322 return sql 

323 

324 def enumerate_posts(self, blog_selection=None, post_selection=None, sorted_=True, 

325 first=1000, specific=None, daily_freq=1.5, now=None, 

326 addstatus=False, searchterm=None): 

327 """ 

328 Enumerates all the posts from the database if the blog id 

329 belongs to a selection (or all if blog_selection is empty). 

330 

331 @param blog_selection list of blogs to consider (or empty for all) 

332 @param post_selection list of posts to consider 

333 @param sorted_ sorted by date 

334 @param first we only consider the first ``first`` 

335 @param specific specific search 

336 - None: all posts 

337 - today: get all posts for today 

338 - week: get all posts for last week 

339 @param daily_freq see parameter specific 

340 @param now if None, today means today, if not None, ``now`` will have the meaning of today 

341 @param addstatus if True, fetches the status of a blog 

342 @param searchterm if not None, filters using a SQL like search (using ``%``) 

343 @return enumeration of @see cl BlogPost 

344 """ 

345 if blog_selection is None: 

346 blog_selection = [] 

347 if post_selection is None: 

348 post_selection = [] 

349 self.connect() 

350 sql = self.private_process_condition( 

351 blog_selection, post_selection, sorted_, 

352 specific, now, searchterm) 

353 sql += " LIMIT %d" % first 

354 

355 for row in self.execute(sql): 

356 row = list(row) 

357 row[-2] = row[-2].split(",") 

358 row[3] = row[3] == 1 

359 blog = StreamRSS(* (row[-6:])) 

360 row = row[:-6] 

361 row[0] = blog 

362 

363 bl = BlogPost(*row) 

364 

365 if addstatus: 

366 for st in self.enumerate_latest_status(bl.id, connect=False): 

367 bl.add_status(st) 

368 yield bl 

369 self.close() 

370 

371 def enumerate_posts_status(self, blog_selection=None, post_selection=None, 

372 sorted_=True, specific=None, now=None, 

373 searchterm=None): 

374 """ 

375 Enumerate status. 

376 

377 @param blog_selection list of blogs to consider (or empty for all) 

378 @param post_selection list of posts to consider 

379 @param sorted_ sorted by date 

380 @param specific specific search 

381 - None: all posts 

382 - today: get all posts for today 

383 - week: get all posts for last week 

384 @param now if None, today means today, if not None, ``now`` will have the meaning of today 

385 @param searchterm if not None, filters using a SQL like search (using ``%``) 

386 @return enumerate on values from ``table_stats`` ordered by decreasing time 

387 """ 

388 if blog_selection is None: 

389 blog_selection = [] 

390 if post_selection is None: 

391 post_selection = [] 

392 self.connect() 

393 

394 sql_po = self.private_process_condition( 

395 blog_selection, post_selection, sorted_, 

396 specific, now, searchterm) 

397 

398 sql_st = """SELECT A.id_post, status, A.dtime FROM ( 

399 SELECT id_post, MAX(dtime) AS dtime FROM {0} 

400 GROUP BY id_post) AS A 

401 INNER JOIN {0} 

402 ON A.id_post == {0}.id_post""".format(self.table_stats) 

403 

404 sql = """SELECT DISTINCT id_rss, title, guid, isPermaLink, link, description, pubDate, keywords, id, 

405 titleb, type, xmlUrl, htmlUrl, keywordsb, idblog, status, dtime 

406 FROM ( 

407 {0} 

408 ) 

409 AS tA 

410 INNER JOIN ( 

411 {1} 

412 ) AS tB 

413 ON tA.id == tB.id_post""". format(sql_po, sql_st) 

414 

415 for row in self.execute(sql): 

416 row = list(row) 

417 row[-4] = row[-4].split(",") 

418 row[3] = row[3] == 1 

419 blog = StreamRSS(* (row[-8:-2])) 

420 st = {"status": row[-2], "dtime": row[-1]} 

421 row = row[:-8] 

422 row[0] = blog 

423 

424 bl = BlogPost(*row) 

425 bl.add_status(st) 

426 yield bl 

427 

428 self.close()