Statistiques Wikipedia - énoncé#
Links: notebook
, html, PDF
, python
, slides, GitHub
Parallélisation de la récupération de fichiers de données depuis wikipédia.
from jyquickhelper import add_notebook_menu
add_notebook_menu()
Exercice 1 : parallélisation du téléchargement#
On peut paralléliser le téléchargement de différentes façons :
avec des threads (librairie threading : synchronisation rapide mais parfois délicate et mémoire partagée entre threads
avec des processus (librairie multiprocessing, joblib, jupyter : synchronisation lente, pas de mémoire partagée
avec un cluster, jupyter : synchronisation lente, pas de mémoire partagée, parallélisme en grande dimension
La page ParallelProcessing recense des modules qui implémente cela mais elle n’est pas très à jour. Il faut vérifier si les modules proposés sont encore maintenus.
Approche avec des threads#
import threading, time, os
from datetime import datetime, timedelta
from mlstatpy.data.wikipedia import download_pageviews
folder = "d:\\wikipv"
if not os.path.exists(folder):
os.mkdir(folder)
class DownloadThread(threading.Thread) :
"""thread definition, it downloads a stream one after another
until a queue is empty"""
def __init__ (self, qu, name, folder) :
threading.Thread.__init__ (self)
self.qu = qu
self.name = name
self.folder = folder
def run (self) :
while not self.qu.empty():
date = self.qu.get(False)
if date is None:
break
print(self.name, "download", date, "len(qu)", self.qu.qsize())
try:
download_pageviews(date, folder=self.folder)
except Exception as e:
print("skipping dt", dt, "rerun to get it", e)
# On doit le faire à chaque fois.
self.qu.task_done()
# on créé des files et les threads associés
import queue
queues = [queue.Queue() for i in range(0, 3)]
m = [DownloadThread(q, "thread %d" % i, folder) for i, q in enumerate(queues)]
# on remplit les files
dt = datetime.now() - timedelta(15)
hour = timedelta(hours=1)
for h in range(0, 24*7):
queues[h%3].put(dt)
dt += hour
# on démarre les threads
for t in m:
t.start()
# on attend qu'elles se vident
for i, q in enumerate(queues):
print("attendre file", i, [q.qsize() for q in queues])
q.join()
# On ne peut pas utiliser quelque chose comme ceci :
while not q.empty():
time.sleep(1)
# Le programme s'arrête dès que les files sont vides.
# Ceci arrive avec l'instruction q.get()
# avant que le téléchargement soit fini.
# Le programme s'arrête et interrompt les threads en cours.
thread 0 download 2016-08-28 05:27:45.899868 len(qu) 55
thread 0 download 2016-08-28 08:27:45.899868 len(qu) 54
thread 0 download 2016-08-28 11:27:45.899868 len(qu) 53
thread 0 download 2016-08-28 14:27:45.899868 len(qu) 52
thread 0 download 2016-08-28 17:27:45.899868 len(qu) 51
thread 0 download 2016-08-28 20:27:45.899868 len(qu) 50
thread 0 download 2016-08-28 23:27:45.899868 len(qu) 49
thread 0 download 2016-08-29 02:27:45.899868 len(qu) 48
thread 0 download 2016-08-29 05:27:45.899868 len(qu) 47
thread 1 download 2016-08-28 06:27:45.899868 len(qu) 55
thread 0 download 2016-08-29 08:27:45.899868 len(qu) 46
thread 1 download 2016-08-28 09:27:45.899868 len(qu) 54
thread 1 download 2016-08-28 12:27:45.899868 len(qu) 53
thread 0 download 2016-08-29 11:27:45.899868 len(qu) 45
thread 1 download 2016-08-28 15:27:45.899868 len(qu) 52
thread 1 download 2016-08-28 18:27:45.899868 len(qu) 51
thread 1 download 2016-08-28 21:27:45.899868 len(qu) 50
thread 1 download 2016-08-29 00:27:45.899868 len(qu) 49
thread 1 download 2016-08-29 03:27:45.899868 len(qu) 48
thread 1 download 2016-08-29 06:27:45.899868 len(qu) 47
thread 1 download 2016-08-29 09:27:45.899868 len(qu) 46
thread 1 download 2016-08-29 12:27:45.899868 len(qu) 45
thread 0 download 2016-08-29 14:27:45.899868 len(qu) 44
thread 2 download 2016-08-28 07:27:45.899868 len(qu) 55
thread 2 download 2016-08-28 10:27:45.899868 len(qu) 54
thread 1 download 2016-08-29 15:27:45.899868 len(qu) 44
thread 1 download 2016-08-29 18:27:45.899868 len(qu) 43
thread 2 download 2016-08-28 13:27:45.899868 len(qu) 53
thread 1 download 2016-08-29 21:27:45.899868 len(qu) 42
thread 2 download 2016-08-28 16:27:45.899868 len(qu) 52
thread 1 download 2016-08-30 00:27:45.899868 len(qu) 41
thread 1 download 2016-08-30 03:27:45.899868 len(qu) 40
thread 2 download 2016-08-28 19:27:45.899868 len(qu) 51
attendre file 0 [44, 40, 51]
thread 0 download 2016-08-29 17:27:45.899868 len(qu) 43
thread 1 download 2016-08-30 06:27:45.899868 len(qu) 39
thread 0 download 2016-08-29 20:27:45.899868 len(qu) 42
thread 0 download 2016-08-29 23:27:45.899868 len(qu) 41
thread 0 download 2016-08-30 02:27:45.899868 len(qu) 40
thread 0 download 2016-08-30 05:27:45.899868 len(qu) 39
thread 0 download 2016-08-30 08:27:45.899868 len(qu) 38
thread 1 download 2016-08-30 09:27:45.899868 len(qu) 38
thread 0 download 2016-08-30 11:27:45.899868 len(qu) 37
thread 0 download 2016-08-30 14:27:45.899868 len(qu) 36
thread 1 download 2016-08-30 12:27:45.899868 len(qu) 37
thread 0 download 2016-08-30 17:27:45.899868 len(qu) 35
thread 1 download 2016-08-30 15:27:45.899868 len(qu) 36
thread 0 download 2016-08-30 20:27:45.899868 len(qu) 34
thread 1 download 2016-08-30 18:27:45.899868 len(qu) 35
thread 0 download 2016-08-30 23:27:45.899868 len(qu) 33
thread 1 download 2016-08-30 21:27:45.899868 len(qu) 34
thread 0 download 2016-08-31 02:27:45.899868 len(qu) 32
thread 1 download 2016-08-31 00:27:45.899868 len(qu) 33
thread 0 download 2016-08-31 05:27:45.899868 len(qu) 31
thread 1 download 2016-08-31 03:27:45.899868 len(qu) 32
thread 0 download 2016-08-31 08:27:45.899868 len(qu) 30
thread 1 download 2016-08-31 06:27:45.899868 len(qu) 31
thread 0 download 2016-08-31 11:27:45.899868 len(qu) 29
thread 1 download 2016-08-31 09:27:45.899868 len(qu) 30
thread 0 download 2016-08-31 14:27:45.899868 len(qu) 28
thread 1 download 2016-08-31 12:27:45.899868 len(qu) 29
thread 0 download 2016-08-31 17:27:45.899868 len(qu) 27
thread 1 download 2016-08-31 15:27:45.899868 len(qu) 28
thread 1 download 2016-08-31 18:27:45.899868 len(qu) 27
thread 0 download 2016-08-31 20:27:45.899868 len(qu) 26
thread 2 download 2016-08-28 22:27:45.899868 len(qu) 50
thread 1 download 2016-08-31 21:27:45.899868 len(qu) 26
thread 2 download 2016-08-29 01:27:45.899868 len(qu) 49
thread 0 download 2016-08-31 23:27:45.899868 len(qu) 25
thread 1 download 2016-09-01 00:27:45.899868 len(qu) 25
thread 0 download 2016-09-01 02:27:45.899868 len(qu) 24
thread 2 download 2016-08-29 04:27:45.899868 len(qu) 48
thread 0 download 2016-09-01 05:27:45.899868 len(qu) 23
thread 2 download 2016-08-29 07:27:45.899868 len(qu) 47
thread 0 download 2016-09-01 08:27:45.899868 len(qu) 22
thread 2 download 2016-08-29 10:27:45.899868 len(qu) 46
thread 0 download 2016-09-01 11:27:45.899868 len(qu) 21
thread 0 download 2016-09-01 14:27:45.899868 len(qu) 20
thread 1 download 2016-09-01 03:27:45.899868 len(qu) 24
thread 1 download 2016-09-01 06:27:45.899868 len(qu) 23
thread 1 download 2016-09-01 09:27:45.899868 len(qu) 22
thread 2 download 2016-08-29 13:27:45.899868 len(qu) 45
thread 2 download 2016-08-29 16:27:45.899868 len(qu) 44
thread 2 download 2016-08-29 19:27:45.899868 len(qu) 43
thread 2 download 2016-08-29 22:27:45.899868 len(qu) 42
thread 2 download 2016-08-30 01:27:45.899868 len(qu) 41
thread 2 download 2016-08-30 04:27:45.899868 len(qu) 40
thread 2 download 2016-08-30 07:27:45.899868 len(qu) 39
thread 2 download 2016-08-30 10:27:45.899868 len(qu) 38
thread 2 download 2016-08-30 13:27:45.899868 len(qu) 37
thread 2 download 2016-08-30 16:27:45.899868 len(qu) 36
thread 2 download 2016-08-30 19:27:45.899868 len(qu) 35
thread 2 download 2016-08-30 22:27:45.899868 len(qu) 34
thread 2 download 2016-08-31 01:27:45.899868 len(qu) 33
thread 2 download 2016-08-31 04:27:45.899868 len(qu) 32
thread 2 download 2016-08-31 07:27:45.899868 len(qu) 31
thread 2 download 2016-08-31 10:27:45.899868 len(qu) 30
thread 1 download 2016-09-01 12:27:45.899868 len(qu) 21
thread 2 download 2016-08-31 13:27:45.899868 len(qu) 29
thread 0 download 2016-09-01 17:27:45.899868 len(qu) 19
thread 1 download 2016-09-01 15:27:45.899868 len(qu) 20
thread 0 download 2016-09-01 20:27:45.899868 len(qu) 18
thread 2 download 2016-08-31 16:27:45.899868 len(qu) 28
thread 0 download 2016-09-01 23:27:45.899868 len(qu) 17
thread 1 download 2016-09-01 18:27:45.899868 len(qu) 19
thread 0 download 2016-09-02 02:27:45.899868 len(qu) 16
thread 1 download 2016-09-01 21:27:45.899868 len(qu) 18
thread 1 download 2016-09-02 00:27:45.899868 len(qu) 17
thread 1 download 2016-09-02 03:27:45.899868 len(qu) 16
thread 2 download 2016-08-31 19:27:45.899868 len(qu) 27
thread 0 download 2016-09-02 05:27:45.899868 len(qu) 15
thread 0 download 2016-09-02 08:27:45.899868 len(qu) 14
thread 0 download 2016-09-02 11:27:45.899868 len(qu) 13
thread 0 download 2016-09-02 14:27:45.899868 len(qu) 12
thread 2 download 2016-08-31 22:27:45.899868 len(qu) 26
thread 0 download 2016-09-02 17:27:45.899868 len(qu) 11
thread 2 download 2016-09-01 01:27:45.899868 len(qu) 25
thread 0 download 2016-09-02 20:27:45.899868 len(qu) 10
thread 2 download 2016-09-01 04:27:45.899868 len(qu) 24
thread 2 download 2016-09-01 07:27:45.899868 len(qu) 23
thread 1 download 2016-09-02 06:27:45.899868 len(qu) 15
thread 1 download 2016-09-02 09:27:45.899868 len(qu) 14
thread 1 download 2016-09-02 12:27:45.899868 len(qu) 13
thread 0 download 2016-09-02 23:27:45.899868 len(qu) 9
thread 0 download 2016-09-03 02:27:45.899868 len(qu) 8
thread 2 download 2016-09-01 10:27:45.899868 len(qu) 22
thread 1 download 2016-09-02 15:27:45.899868 len(qu) 12
thread 2 download 2016-09-01 13:27:45.899868 len(qu) 21
thread 0 download 2016-09-03 05:27:45.899868 len(qu) 7
thread 2 download 2016-09-01 16:27:45.899868 len(qu) 20
thread 1 download 2016-09-02 18:27:45.899868 len(qu) 11
thread 2 download 2016-09-01 19:27:45.899868 len(qu) 19
thread 0 download 2016-09-03 08:27:45.899868 len(qu) 6
thread 1 download 2016-09-02 21:27:45.899868 len(qu) 10
thread 2 download 2016-09-01 22:27:45.899868 len(qu) 18
thread 0 download 2016-09-03 11:27:45.899868 len(qu) 5
thread 1 download 2016-09-03 00:27:45.899868 len(qu) 9
thread 2 download 2016-09-02 01:27:45.899868 len(qu) 17
thread 1 download 2016-09-03 03:27:45.899868 len(qu) 8
thread 2 download 2016-09-02 04:27:45.899868 len(qu) 16
thread 1 download 2016-09-03 06:27:45.899868 len(qu) 7
thread 1 download 2016-09-03 09:27:45.899868 len(qu) 6
thread 2 download 2016-09-02 07:27:45.899868 len(qu) 15
thread 2 download 2016-09-02 10:27:45.899868 len(qu) 14
thread 1 download 2016-09-03 12:27:45.899868 len(qu) 5
thread 2 download 2016-09-02 13:27:45.899868 len(qu) 13
thread 2 download 2016-09-02 16:27:45.899868 len(qu) 12
thread 2 download 2016-09-02 19:27:45.899868 len(qu) 11
thread 2 download 2016-09-02 22:27:45.899868 len(qu) 10
thread 2 download 2016-09-03 01:27:45.899868 len(qu) 9
thread 2 download 2016-09-03 04:27:45.899868 len(qu) 8
thread 2 download 2016-09-03 07:27:45.899868 len(qu) 7
thread 2 download 2016-09-03 10:27:45.899868 len(qu) 6
skipping dt 2016-09-04 05:27:45.899868 rerun to get it [Errno 28] No space left on device
thread 2 download 2016-09-03 13:27:45.899868 len(qu) 5
skipping dt 2016-09-04 05:27:45.899868 rerun to get it [Errno 28] No space left on device
thread 0 download 2016-09-03 14:27:45.899868 len(qu) 4
skipping dt 2016-09-04 05:27:45.899868 rerun to get it [Errno 28] No space left on device
thread 1 download 2016-09-03 15:27:45.899868 len(qu) 4
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-130000.gz, exc=[Errno 28] No space left on device
thread 2 download 2016-09-03 16:27:45.899868 len(qu) 4
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-140000.gz, exc=[Errno 28] No space left on device
thread 0 download 2016-09-03 17:27:45.899868 len(qu) 3
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-150000.gz, exc=[Errno 28] No space left on device
thread 1 download 2016-09-03 18:27:45.899868 len(qu) 3
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-160000.gz, exc=[Errno 28] No space left on device
thread 2 download 2016-09-03 19:27:45.899868 len(qu) 3
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-170000.gz, exc=[Errno 28] No space left on device
thread 0 download 2016-09-03 20:27:45.899868 len(qu) 2
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-180000.gz, exc=[Errno 28] No space left on device
thread 1 download 2016-09-03 21:27:45.899868 len(qu) 2
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-190000.gz, exc=[Errno 28] No space left on device
thread 2 download 2016-09-03 22:27:45.899868 len(qu) 2
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-200000.gz, exc=[Errno 28] No space left on device
thread 0 download 2016-09-03 23:27:45.899868 len(qu) 1
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-210000.gz, exc=[Errno 28] No space left on device
thread 1 download 2016-09-04 00:27:45.899868 len(qu) 1
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-220000.gz, exc=[Errno 28] No space left on device
thread 2 download 2016-09-04 01:27:45.899868 len(qu) 1
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-230000.gz, exc=[Errno 28] No space left on device
thread 0 download 2016-09-04 02:27:45.899868 len(qu) 0
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-000000.gz, exc=[Errno 28] No space left on device
thread 1 download 2016-09-04 03:27:45.899868 len(qu) 0
attendre file 1 [0, 0, 1]
attendre file 2 [0, 0, 1]
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-010000.gz, exc=[Errno 28] No space left on device
thread 2 download 2016-09-04 04:27:45.899868 len(qu) 0
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-020000.gz, exc=[Errno 28] No space left on device
done thread 0
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-030000.gz, exc=[Errno 28] No space left on device
done thread 1
skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-040000.gz, exc=[Errno 28] No space left on device
done thread 2
Parallélisation avec des processus#
Il n’est pas toujours évident de comprendre ce qu’il se passe quand
l’erreur se produit dans un processus différent. Si on change le
backend pour "threading"
, l’erreur devient visible. Voir
Parallel.
Le code ne fonctionne pas toujours lorsque n_jobs > 1
sous Windows
et que le backend est celui par défaut (processus). Lire Embarrassingly
Parallel For
Loops.
from joblib import Parallel, delayed
from datetime import datetime, timedelta
import os
folder = "d:\\wikipv"
if not os.path.exists(folder):
os.mkdir(folder)
# on remplit les files
dt = datetime.now() - timedelta(14)
hour = timedelta(hours=1)
dates = [dt + hour*i for i in range(0,24)]
def downloadp2(dt, folder):
from mlstatpy.data.wikipedia import download_pageviews
download_pageviews(dt, folder=folder)
# L'instruction ne marche pas depuis un notebook lorsque le backend est "muliprocessing".
# Dans ce cas, il faut exécuter un programme.
if __name__ == "__main__":
Parallel(n_jobs=3, verbose=5)(delayed(downloadp2)(dt, folder) for dt in dates)
Filtrage pour ne garder que les lignes avec fr#
def filtre(input, country):
import os
print(input)
output = input + "." + country
if not os.path.exists(output):
with open(input, "r", encoding="utf-8") as f:
with open(output, "w", encoding="utf-8") as g:
for line in f:
if line.startswith(country):
g.write(line)
import os
from joblib import Parallel, delayed
folder = "wikipv"
files = os.listdir(folder)
files = [os.path.join(folder, _) for _ in files if _.startswith("pageviews") and _.endswith("0000")]
Parallel(n_jobs=3, verbose=5, backend="threading")(delayed(filtre)(name, "fr") for name in files)
wikipvpageviews-20160827-210000wikipvpageviews-20160827-220000 wikipvpageviews-20160827-230000 wikipvpageviews-20160828-000000 wikipvpageviews-20160828-010000 wikipvpageviews-20160828-020000 wikipvpageviews-20160828-030000 wikipvpageviews-20160828-040000 wikipvpageviews-20160828-050000 wikipvpageviews-20160828-060000 wikipvpageviews-20160828-070000 wikipvpageviews-20160828-080000 wikipvpageviews-20160828-090000 wikipvpageviews-20160828-100000 wikipvpageviews-20160828-110000
[Parallel(n_jobs=3)]: Done 12 tasks | elapsed: 53.4s
wikipvpageviews-20160828-120000 wikipvpageviews-20160828-130000 wikipvpageviews-20160828-140000 wikipvpageviews-20160828-150000 wikipvpageviews-20160828-160000 wikipvpageviews-20160828-170000 wikipvpageviews-20160828-180000 wikipvpageviews-20160828-190000 wikipvpageviews-20160828-200000 wikipvpageviews-20160828-210000 wikipvpageviews-20160828-220000 wikipvpageviews-20160828-230000 wikipvpageviews-20160829-000000 wikipvpageviews-20160829-010000 wikipvpageviews-20160829-020000 wikipvpageviews-20160829-030000 wikipvpageviews-20160829-040000 wikipvpageviews-20160829-050000 wikipvpageviews-20160829-060000 wikipvpageviews-20160829-070000 wikipvpageviews-20160829-080000
Insérer le fichier dans une base de données SQL#
import pandas
df = pandas.read