Coverage for src/manydataapi/velib/data_jcdecaux.py: 74%

301 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-02 08:38 +0200

1# -*- coding:utf-8 -*- 

2""" 

3@file 

4@brief The file contains a class which collects data coming from :epkg:`Velib`. 

5 

6""" 

7 

8import os 

9import os.path 

10import datetime 

11import json 

12import time 

13import re 

14import math 

15import random 

16import urllib 

17import urllib.error 

18import urllib.request 

19import pandas 

20import numpy 

21 

22 

23class DataCollectJCDecaux: 

24 

25 """ 

26 This class automates data collecting from :epkg:`JCDecaux`. 

27 The service is provided at `JCDecaux developer <https://developer.jcdecaux.com/#/home>`_. 

28 

29 See also `notebook on Velib <http://nbviewer.ipython.org/5520933>`_ 

30 The list of contracts for :epkg:`JCDecaux` can be obtained at: 

31 `Données statiques <https://developer.jcdecaux.com/#/opendata/vls?page=static>`_. 

32 The API provided by :epkg:`JCDecaux` is described 

33 `here <https://developer.jcdecaux.com/#/opendata/vls?page=dynamic>`_. 

34 

35 .. exref:: 

36 :title: Simple code to fetch velib data 

37 

38 :: 

39 

40 private_key = 'your_key' 

41 

42 from manydataapi.velib import DataCollectJCDecaux 

43 DataCollectJCDecaux.run_collection(private_key, contract="besancon", 

44 delayms=30000, single_file=False, stop_datetime=None, 

45 log_every=1) 

46 """ 

47 

48 #: list of available cities = contract (subset) 

49 _contracts_static = {k: 1 for k in [ 

50 'arcueil', 'besancon', 'lyon', 'nancy']} 

51 

52 # api: two substring to replace (contract, apiKey) 

53 _url_api = "https://api.jcdecaux.com/vls/v1/stations?contract=%s&apiKey=%s" 

54 _url_apic = "https://api.jcdecaux.com/vls/v1/contracts?apiKey=%s" 

55 

56 def __init__(self, apiKey, fetch_contracts=False): 

57 """ 

58 @param apiKey api key 

59 @param fetch_contracts if True, it uses a short list of known contracts, 

60 otherwise, it will updated through the website API 

61 """ 

62 self.apiKey = apiKey 

63 self.contracts = DataCollectJCDecaux._contracts_static if not fetch_contracts else self.get_contracts() 

64 

65 # sometimes, lng and lat are null, check if some past retrieving 

66 # returned non null coordinates 

67 self.memoGeoStation = {} 

68 

69 def get_contracts(self): 

70 """ 

71 Returns the list of contracts. 

72 

73 @return dictionary, something like ``{'station': 1}`` 

74 """ 

75 url = DataCollectJCDecaux._url_apic % (self.apiKey) 

76 try: 

77 with urllib.request.urlopen(url) as u: 

78 js = u.read() 

79 except (urllib.error.HTTPError, urllib.error.URLError): # pragma: no cover 

80 # there was probably a mistake 

81 # We try again after a given amount of time 

82 time.sleep(0.5) 

83 try: 

84 with urllib.request.urlopen(url) as u: 

85 js = u.read() 

86 except (urllib.error.HTTPError, urllib.error.URLError) as exc2: 

87 # there was probably a mistake, we stop 

88 raise RuntimeError("Unable to access url %r." % url) from exc2 

89 

90 js = str(js, encoding="utf8") 

91 js = json.loads(js) 

92 cont = {k["name"]: 1 for k in js} 

93 return cont 

94 

95 def get_json(self, contract): 

96 """ 

97 Returns the data associated to a contract. 

98 

99 @param contract contract name, @see te _contracts 

100 @return :epkg:`json` string 

101 """ 

102 if contract not in self.contracts: 

103 raise RuntimeError( # pragma: no cover 

104 "Unable to find contract '{0}' in:\n{1}".format(contract, "\n".join( 

105 self.contracts.keys()))) 

106 url = DataCollectJCDecaux._url_api % (contract, self.apiKey) 

107 

108 try: 

109 with urllib.request.urlopen(url) as u: 

110 js = u.read() 

111 except (urllib.error.HTTPError, urllib.error.URLError): # pragma: no cover 

112 # there was probably a mistake 

113 # We try again after a given amount of time 

114 time.sleep(0.5) 

115 try: 

116 with urllib.request.urlopen(url) as u: 

117 js = u.read() 

118 except (urllib.error.HTTPError, urllib.error.URLError): 

119 # there was probably a mistake 

120 # we stop 

121 return json.loads("[]") 

122 

123 js = str(js, encoding="utf8") 

124 js = json.loads(js) 

125 now = datetime.datetime.now() 

126 for o in js: 

127 o["number"] = int(o["number"]) 

128 o["banking"] = 1 if o["banking"] == "True" else 0 

129 o["bonus"] = 1 if o["bonus"] == "True" else 0 

130 

131 o["bike_stands"] = int(o["bike_stands"]) 

132 o["available_bike_stands"] = int(o["available_bike_stands"]) 

133 o["available_bikes"] = int(o["available_bikes"]) 

134 o["collect_date"] = now 

135 

136 try: 

137 ds = float(o["last_update"]) 

138 dt = datetime.datetime.fromtimestamp(ds / 1000) 

139 except ValueError: # pragma: no cover 

140 dt = datetime.datetime.now() 

141 except TypeError: # pragma: no cover 

142 dt = datetime.datetime.now() 

143 o["last_update"] = dt 

144 

145 try: 

146 o["lat"] = float( 

147 o["position"]["lat"]) if o["position"]["lat"] is not None else None 

148 o["lng"] = float( 

149 o["position"]["lng"]) if o["position"]["lng"] is not None else None 

150 except TypeError as e: # pragma: no cover 

151 raise TypeError( # pylint: disable=W0707 

152 "Unable to convert geocode for the following row: %s\n%s" % 

153 (str(o), str(e))) 

154 

155 key = contract, o["number"] 

156 if key in self.memoGeoStation: 

157 if o["lat"] == 0 or o["lng"] == 0: 

158 o["lat"], o["lng"] = self.memoGeoStation[key] 

159 elif o["lat"] != 0 and o["lng"] != 0: 

160 self.memoGeoStation[key] = o["lat"], o["lng"] 

161 

162 del o["position"] 

163 

164 return js 

165 

166 def collecting_data(self, contract, delayms=1000, outfile="velib_data.txt", 

167 single_file=True, stop_datetime=None, log_every=10, 

168 fLOG=print): 

169 """ 

170 Collects data for a period of time. 

171 

172 @param contract contract name, @see te _contracts 

173 @param delayms delay between two collections (in ms) 

174 @param outfile write data in this file (json), if single_file is True, outfile is used as a prefix 

175 @param single_file if True, one file, else, many files with timestamp as a suffix 

176 @param stop_datetime if None, never stops, else stops when the date is reached 

177 @param log_every print something every <log_every> times data were collected 

178 @param fLOG logging function (None to disable) 

179 @return list of created file 

180 """ 

181 delay = datetime.timedelta(seconds=delayms / 1000) 

182 now = datetime.datetime.now() 

183 cloc = now 

184 delayms /= 50 

185 delays = delayms / 1000.0 

186 

187 nb = 0 

188 while stop_datetime is None or now < stop_datetime: 

189 now = datetime.datetime.now() 

190 cloc += delay 

191 js = self.get_json(contract) 

192 

193 if single_file: 

194 with open(outfile, "a", encoding="utf8") as f: 

195 f.write("%s\t%s\n" % (str(now), str(js))) 

196 else: 

197 name = outfile + "." + \ 

198 str(now).replace(":", 

199 "-").replace("/", 

200 "-").replace(" ", 

201 "_") + ".txt" 

202 with open(name, "w", encoding="utf8") as f: 

203 f.write(str(js)) 

204 

205 nb += 1 

206 if fLOG and nb % log_every == 0: 

207 fLOG("DataCollectJCDecaux.collecting_data: nb={0} {1} delay={2}".format( 

208 nb, now, delay)) 

209 

210 while now < cloc: 

211 now = datetime.datetime.now() 

212 time.sleep(delays) 

213 

214 @staticmethod 

215 def run_collection(key=None, contract="Paris", delayms=60000, folder_file="velib_data", 

216 stop_datetime=None, single_file=False, log_every=1, fLOG=print): 

217 """ 

218 Runs the collection of the data for velib, data are stored using :epkg:`json` format. 

219 The function creates a file every time a new status is downloaded. 

220 

221 @param key (str|None), not implemented if None 

222 @param contract a city 

223 @param delayms gets a status every delayms milliseconds 

224 @param folder_file prefix used to create one file or several, it depends on single_file) where to place downloaded files) 

225 @param stop_datetime (datetime) stop when this datetime is reached or None for never stops 

226 @param single_file if True, every json status will be stored in a single file, if False, it will be 

227 a different file each time, if True, then folder_file is a file 

228 @param log_every log some information every 1 (minutes) 

229 @param fLOG logging function (None to disable) 

230 

231 .. exref:: 

232 :title: collect Velib data 

233 

234 The following example produces a file every minute in json format about the status of all 

235 Velib stations in Paris. They will be put in a folder call ``velib_data``. 

236 

237 :: 

238 

239 from manydataapi.velib.data_jcdecaux import DataCollectJCDecaux 

240 DataCollectJCDecaux.run_collection(private_key, contract="Paris", 

241 delayms=60000, single_file=False, stop_datetime=None, 

242 log_every=1) 

243 """ 

244 if key is None: 

245 raise NotImplementedError( # pragma: no cover 

246 "key cannot be None") 

247 velib = DataCollectJCDecaux(key, True) 

248 velib.collecting_data(contract, delayms, folder_file, stop_datetime=stop_datetime, 

249 single_file=single_file, log_every=log_every, fLOG=fLOG) 

250 

251 @staticmethod 

252 def to_df(folder, regex="velib_data.*[.]txt"): 

253 """ 

254 Reads all files in a folder (assuming there were produced by this class) and 

255 returns a dataframe with it. 

256 

257 @param folder folder where to find the files 

258 @param regex regular expression which filter the files 

259 @return pandas DataFrame 

260 

261 Each file is a status of all stations, a row per 

262 station will be added to the file. 

263 It produces a table with the following columns: 

264 

265 - address 

266 - available_bike_stands 

267 - available_bikes 

268 - banking 

269 - bike_stands 

270 - bonus 

271 - collect_date 

272 - contract_name 

273 - last_update 

274 - lat 

275 - lng 

276 - name 

277 - number 

278 - status 

279 - file 

280 """ 

281 if regex is None: 

282 regex = ".*" 

283 reg = re.compile(regex) 

284 

285 files_ = os.listdir(folder) 

286 files = [_ for _ in files_ if reg.search(_)] 

287 

288 if len(files) == 0: 

289 raise FileNotFoundError( # pragma: no cover 

290 "No found files in directory: '{}'\nregex: '{}'.".format( 

291 folder, regex)) 

292 

293 rows = [] 

294 for file_ in files: 

295 file = os.path.join(folder, file_) 

296 with open(file, "r", encoding="utf8") as f: 

297 lines = f.readlines() 

298 for i, line in enumerate(lines): 

299 dl = eval(line.strip("\n\r\t ")) # pylint: disable=W0123 

300 if not isinstance(dl, list): 

301 raise TypeError( # pragma: no cover 

302 "Expects a list for line {0} in file {1}".format( 

303 i, 

304 file)) 

305 for d in dl: 

306 d["file"] = file_ 

307 rows.extend(dl) 

308 

309 return pandas.DataFrame(rows) 

310 

311 @staticmethod 

312 def draw(df, use_folium=False, **args): 

313 """ 

314 Draws a graph using four columns: *lng*, *lat*, *available_bike_stands*, *available_bikes*. 

315 

316 @param df dataframe 

317 @param args other parameters to give method ``plt.subplots`` or :epkg:`folium` 

318 @param use_folium use folium to create the map 

319 @return fig, ax, plt, (fig,ax) comes plt.subplot, plt is matplotlib.pyplot 

320 

321 Additional parameters: 

322 

323 * size: change the size of points 

324 """ 

325 size = args.get('size', 1) 

326 if 'size' in args: 

327 del args['size'] 

328 

329 if not use_folium: 

330 import matplotlib.pyplot as plt 

331 fig, ax = plt.subplots(**args) 

332 

333 x = df["lng"] 

334 y = df["lat"] 

335 areaf = df.apply( 

336 lambda r: r["available_bike_stands"] ** 0.5 * size, axis=1) 

337 areab = df.apply( 

338 lambda r: r["available_bikes"] ** 0.5 * size, axis=1) 

339 ax.scatter(x, y, areaf, alpha=0.5, label="place", color="r") 

340 ax.scatter(x, y, areab, alpha=0.5, label="bike", color="g") 

341 ax.grid(True) 

342 ax.legend() 

343 ax.set_xlabel("longitude") 

344 ax.set_ylabel("latitude") 

345 

346 return fig, ax, plt 

347 else: 

348 import folium 

349 x = df["lat"].mean() 

350 y = df["lng"].mean() 

351 map_osm = folium.Map(location=[x, y], zoom_start=13) 

352 

353 def add_marker(row): 

354 "add marker" 

355 t = "+ {0} o {1}".format(row["available_bikes"], 

356 row["available_bike_stands"]) 

357 folium.CircleMarker([row["lat"], row["lng"]], color='#3186cc', fill_color='#3186cc', 

358 popup=t, radius=(row["available_bikes"] / numpy.pi) ** 0.5 * 30 * size).add_to(map_osm) 

359 folium.CircleMarker([row["lat"], row["lng"]], color='#cc8631', fill_color='#cc8631', 

360 popup=t, radius=(row["available_bike_stands"] / numpy.pi) ** 0.5 * 30 * size).add_to(map_osm) 

361 

362 df.apply(lambda row: add_marker(row), axis=1) 

363 return map_osm 

364 

365 @staticmethod 

366 def animation(df, interval=20, module="matplotlib", **args): 

367 """ 

368 Displays a javascript animation, 

369 see `animation.FuncAnimation 

370 <http://matplotlib.org/api/animation_api.html#matplotlib.animation.FuncAnimation>`_. 

371 

372 @param df dataframe 

373 @param interval see `animation.FuncAnimation 

374 <http://matplotlib.org/api/animation_api.html#matplotlib.animation.FuncAnimation>`_ 

375 @param module module to build the animation 

376 @param args other parameters to give method ``plt.figure`` 

377 @return animation 

378 

379 Available modules for animation: 

380 

381 * :epkg:`matplotlib` 

382 * :epkg:`moviepy` 

383 

384 Additional arguments: 

385 

386 * size: size of scatter plots 

387 * duration: if module is 'moviepy', duration of the animation 

388 """ 

389 size = args.get('size', 1) 

390 if 'size' in args: 

391 del args['size'] 

392 duration = args.get('duration', 2) 

393 if 'duration' in args: 

394 del args['duration'] 

395 

396 dates = list(sorted(set(df["file"]))) 

397 datas = [] 

398 for d in dates: 

399 sub = df[df["file"] == d] 

400 x = sub["lng"] 

401 y = sub["lat"] 

402 colp = sub.apply( 

403 lambda r: r["available_bike_stands"] ** 0.5 * size, axis=1) 

404 colb = sub.apply( 

405 lambda r: r["available_bikes"] ** 0.5 * size, axis=1) 

406 x = tuple(x) 

407 y = tuple(y) 

408 colp = tuple(colp) 

409 colb = tuple(colb) 

410 data = (x, y, colp, colb) 

411 datas.append(data) 

412 

413 import matplotlib.pyplot as plt 

414 

415 def scatter_fig(i=0): 

416 "scatter plot" 

417 fig, ax = plt.subplots(**args) 

418 x, y, c, d = datas[i] 

419 

420 scat1 = ax.scatter(x, y, c, alpha=0.5, color="r", label="place") 

421 scat2 = ax.scatter(x, y, d, alpha=0.5, color="g", label="bike") 

422 ax.grid(True) 

423 ax.legend() 

424 ax.set_xlabel("longitude") 

425 ax.set_ylabel("latitude") 

426 return fig, ax, scat1, scat2 

427 

428 if module == "matplotlib": 

429 from matplotlib import animation 

430 

431 def animate(i, datas, scat1, scat2): 

432 "animation" 

433 _, __, c, d = datas[i] 

434 # scat1.set_array(numpy.array(c)) 

435 # scat2.set_array(numpy.array(d)) 

436 #scat1.set_array(numpy.array(x + y)) 

437 #scat2.set_array(numpy.array(x + y)) 

438 scat1._sizes = c 

439 scat2._sizes = d 

440 return scat1, scat2 

441 

442 fig, _, scat1, scat2 = scatter_fig() 

443 anim = animation.FuncAnimation(fig, animate, frames=len(datas), 

444 interval=interval, fargs=(datas, scat1, scat2), blit=True) 

445 plt.close('all') 

446 return anim 

447 

448 elif module == "moviepy": 

449 from moviepy.video.io.bindings import mplfig_to_npimage 

450 import moviepy.editor as mpy 

451 

452 def make_frame_mpl(t): 

453 "mpl=matplotlib" 

454 i = min(int(t * len(datas)), len(datas) - 1) 

455 __, _, c, d = datas[i] 

456 # scat1.set_xdata(x) # <= Update the curve 

457 # scat1.set_ydata(y) # <= Update the curve 

458 scat1._sizes = c 

459 scat2._sizes = d 

460 res = mplfig_to_npimage(fig) 

461 return res 

462 

463 fig, _, scat1, scat2 = scatter_fig(0) 

464 animation = mpy.VideoClip(make_frame_mpl, duration=duration) 

465 return animation 

466 else: 

467 raise ValueError( # pragma: no cover 

468 "Unsupported module '{0}'".format(module)) 

469 

470 @staticmethod 

471 def distance_haversine(lat1, lon1, lat2, lon2): 

472 """ 

473 Computes the `haversine <https://en.wikipedia.org/wiki/Haversine_formula>`_ distance. 

474 

475 @return double 

476 """ 

477 radius = 6371 

478 dlat = math.radians(lat2 - lat1) 

479 dlon = math.radians(lon2 - lon1) 

480 a = math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1)) \ 

481 * math.cos(math.radians(lat2)) * math.sin(dlon / 2) * math.sin(dlon / 2) 

482 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) 

483 d = radius * c 

484 return d 

485 

486 @staticmethod 

487 def simulate(df, nbbike, speed, 

488 period=datetime.timedelta(minutes=1), 

489 iteration=500, min_min=10, delta_speed=2.5, 

490 fLOG=print): 

491 """ 

492 Simulates velibs on a set of stations given by *df*. 

493 

494 @param df dataframe with station information 

495 @param nbbike number of bicycles 

496 @param period period 

497 @param speed average speed (in km/h) 

498 @param iteration number of iterations 

499 @param min_min minimum duration of a trip 

500 @param delta_speed allowed speed difference 

501 @param fLOG logging function 

502 @return simulated paths, data (as DataFrame) 

503 """ 

504 cities = df[["lat", "lng", "name", "number"]] 

505 start = cities.drop_duplicates() 

506 idvelo = 0 

507 

508 current = {} 

509 for row in start.values: 

510 r = [] 

511 for i in range(0, 5): 

512 r.append(idvelo) 

513 idvelo += 1 

514 r.extend([-1, -1, -1, -1, -1]) 

515 ids = tuple(row) 

516 current[ids] = r 

517 

518 running = [] 

519 

520 def free(v): 

521 "free bycicles" 

522 nb = [_ for _ in v if _ == -1] 

523 return len(nb) > 0 

524 

525 def bike(v): 

526 "bicycles" 

527 nb = [_ for _ in v if _ == -1] 

528 return len(nb) < len(v) 

529 

530 def pop(v): 

531 "pop" 

532 for i, _ in enumerate(v): 

533 if _ != -1: 

534 r = v[i] 

535 v[i] = -1 

536 fLOG(" pop", v) 

537 return r 

538 raise RuntimeError("no free bike") # pragma: no cover 

539 

540 def push(v, idv): 

541 "push" 

542 for i, _ in enumerate(v): 

543 if _ == -1: 

544 v[i] = idv 

545 fLOG(" push", v) 

546 return None 

547 raise RuntimeError("no free spot: " + str(v)) # pragma: no cover 

548 

549 def give_status(conf, ti): 

550 "give status" 

551 rows = [] 

552 for k, v in conf.items(): 

553 lat, lng, name, number = k 

554 obs = {"lat": lat, "lng": lng, "name": name, "number": number} 

555 nb = [_ for _ in v if _ == -1] 

556 obs["available_bike_stands"] = len(nb) 

557 obs["available_bikes"] = len(v) - len(nb) 

558 obs["collect_date"] = ti 

559 obs["file"] = str(ti) 

560 rows.append(obs) 

561 return rows 

562 

563 simulation = [] 

564 paths = [] 

565 keys = list(current.keys()) 

566 iter = 0 

567 tim = datetime.datetime.now() 

568 while iter < iteration: 

569 

570 status = give_status(current, tim) 

571 simulation.extend(status) 

572 

573 # a bike 

574 if len(running) < nbbike: 

575 rnd = random.randint(0, len(keys) - 1) 

576 v = current[keys[rnd]] 

577 if bike(v): 

578 v = (tim, pop(v), keys[rnd], "begin") 

579 running.append(v) 

580 lat, lng, name, number = keys[rnd] 

581 dv = { 

582 "lat0": lat, 

583 "lng0": lng, 

584 "name0": name, 

585 "number0": number} 

586 dv.update({"time": v[0], "idvelo": v[1], "beginend": v[-1], 

587 "hours": 0.0, "dist": 0.0}) 

588 paths.append(dv) 

589 

590 # do we put the bike back 

591 rem = [] 

592 for i, r in enumerate(running): 

593 delta = tim - r[0] 

594 h = delta.total_seconds() / 3600 

595 if h * 60 > min_min: 

596 for _ in cities.values: 

597 row = cities.values[random.randint(0, len(cities) - 1)] 

598 keycity = tuple(row) 

599 station = current[keycity] 

600 if free(station): 

601 vlat, vlng = r[2][0], r[2][1] 

602 clat, clng = row[0], row[1] 

603 dist = DataCollectJCDecaux.distance_haversine( 

604 vlat, 

605 vlng, 

606 clat, 

607 clng) 

608 sp = dist / h 

609 dsp = abs(sp - speed) 

610 if (dsp < delta_speed or (sp < speed and h >= 1)) \ 

611 and random.randint(0, 10) == 0: 

612 # we put it back 

613 push(station, r[1]) 

614 rem.append(i) 

615 

616 lat, lng, name, number = r[2] 

617 dv = { 

618 "lat0": lat, 

619 "lng0": lng, 

620 "name0": name, 

621 "number0": number} 

622 lat, lng, name, number = keycity 

623 dv.update({"lat1": lat, 

624 "lng1": lng, 

625 "name1": name, 

626 "number1": number}) 

627 dv.update({"time": tim, 

628 "idvelo": r[1], 

629 "beginend": "end", 

630 "hours": h, 

631 "dist": dist}) 

632 paths.append(dv) 

633 break 

634 

635 running = [r for i, r in enumerate(running) if i not in rem] 

636 

637 if fLOG: 

638 fLOG("[DataCollectJCDecaux.simulate] iter", "time ", tim, " - ", len(running), 

639 "/", nbbike, " paths ", len(paths)) 

640 

641 # end of loop 

642 tim += period 

643 iter += 1 

644 

645 return pandas.DataFrame(paths), pandas.DataFrame(simulation)