{"cells": [{"cell_type": "markdown", "metadata": {}, "source": ["# Reducers r\u00e9cursifs\n", "\n", "J'utilise volontiers une terminologie d\u00e9couverte chez Microsoft pour illustrer une fa\u00e7on d'\u00e9crire le m\u00eame calcul qui a un impact sur la facilit\u00e9 avec laquelle on peut le distribution : utiliser des comptes plut\u00f4t que des moyennes."]}, {"cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [{"data": {"text/html": ["
run previous cell, wait for 2 seconds
\n", ""], "text/plain": [""]}, "execution_count": 2, "metadata": {}, "output_type": "execute_result"}], "source": ["from jyquickhelper import add_notebook_menu\n", "add_notebook_menu()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Le notebook utilise des fonctions d\u00e9velopp\u00e9es pour illustrer les notions, plus claires qu'efficaces."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Stream\n", "\n", "Le map reduce s'applique \u00e0 des jeux de donn\u00e9es tr\u00e8s grands. D'un point de vue math\u00e9matique, on \u00e9crit des algorithmes qui s'appliquent \u00e0 des jeux de donn\u00e9es infinis ou plut\u00f4t dont la taille n'est pas connu. Pour les distinguer des jeux de donn\u00e9es, on les appelle des *flux* ou *stream* en anglais.\n", "\n", "En apart\u00e9, \u00e9crits pour \u00eatre parall\u00e9lis\u00e9s, ces traitements ont la particuliarit\u00e9 de ne pas conserver l'ordre dans lequel il traite les donn\u00e9es. C'est particuli\u00e8rement vrai lorsque le jeu de donn\u00e9es est divis\u00e9 sur plusieurs disques durs. Il est impossible de choisir un morceau en premier."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Mapper\n", "\n", "Un *mapper* applique le m\u00eame traitement \u00e0 chaque observation du *stream* de fa\u00e7on ind\u00e9pendante."]}, {"cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": ["ens = [('a', 1), ('b', 4), ('a', 6), ('a', 3)]"]}, {"cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [{"data": {"text/plain": [""]}, "execution_count": 4, "metadata": {}, "output_type": "execute_result"}], "source": ["from sparkouille.fctmr import mapper\n", "stream1 = mapper(lambda el: (el[0], el[1]+1), ens)\n", "stream1"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Le r\u00e9sultat n'existe pas tant qu'on ne demande explicitement que le calcul soit faut. Il faut parcourir le r\u00e9sultat."]}, {"cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 2), ('b', 5), ('a', 7), ('a', 4)]"]}, "execution_count": 5, "metadata": {}, "output_type": "execute_result"}], "source": ["list(stream1)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Et on ne peut le parcourir qu'une fois :"]}, {"cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [{"data": {"text/plain": ["[]"]}, "execution_count": 6, "metadata": {}, "output_type": "execute_result"}], "source": ["list(stream1)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Co\u00fbt du premier \u00e9l\u00e9ment\n", "\n", "Quand on a une infinit\u00e9 d'\u00e9l\u00e9ments \u00e0 traiter, il est important de pouvoir regarder ce qu'un traitement donne sur les premiers \u00e9l\u00e9ments. Avec un mapper, cela correspond au co\u00fbt d'un seul map."]}, {"cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": ["from sparkouille.fctmr import take\n", "first = lambda it: take(it, count=1)\n", "big_ens = ens * 100"]}, {"cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["124 \u00b5s \u00b1 15.6 \u00b5s per loop (mean \u00b1 std. dev. of 7 runs, 1000 loops each)\n"]}], "source": ["%timeit -n 1000 list(mapper(lambda el: (el[0], el[1]+1), big_ens))"]}, {"cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["2.46 \u00b5s \u00b1 451 ns per loop (mean \u00b1 std. dev. of 7 runs, 1000 loops each)\n"]}], "source": ["%timeit -n 1000 first(mapper(lambda el: (el[0], el[1]+1), big_ens))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Reducer\n", "\n", "Un vrai *reducer* r\u00e9duit les \u00e9l\u00e9ments d'un ensemble, il ne r\u00e9partit pas les donn\u00e9es. En pratique, on r\u00e9duit rarement un ensemble qu'on n'a pas distribu\u00e9 au pr\u00e9alable, comme avec un *groupby*. On ne r\u00e9duit pas toujours non plus un ensemble \u00e0 une seule ligne. On empile les op\u00e9rations de streaming, on repousse \u00e9galement le moment d'\u00e9valuer. La distribution s'effectue selon une cl\u00e9 qui est hash\u00e9e (voir [Hash et distribution](http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx/notebooks/hash_distribution.html)). La premi\u00e8re lambda fonction d\u00e9crit ce qu'est cette cl\u00e9, le premier \u00e9l\u00e9ment du couple dans ce cas."]}, {"cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [{"data": {"text/plain": [""]}, "execution_count": 10, "metadata": {}, "output_type": "execute_result"}], "source": ["from sparkouille.fctmr import reducer\n", "stream1 = mapper(lambda el: (el[0], el[1]+1), ens)\n", "stream2 = reducer(lambda el: el[0], stream1, asiter=False)\n", "stream2"]}, {"cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', [('a', 2), ('a', 4), ('a', 7)]), ('b', [('b', 5)])]"]}, "execution_count": 11, "metadata": {}, "output_type": "execute_result"}], "source": ["list(stream2)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Dans cet exemple, le *reducer* r\u00e9duit chaque groupe \u00e0 un seul r\u00e9sultat qui est l'ensemble des \u00e9l\u00e9ments. Quel est le coup du premier \u00e9l\u00e9ment..."]}, {"cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["1.75 \u00b5s \u00b1 409 ns per loop (mean \u00b1 std. dev. of 7 runs, 1000 loops each)\n"]}], "source": ["def test2(ens, one=False):\n", " stream1 = mapper(lambda el: (el[0], el[1]+1), ens)\n", " stream2 = reducer(lambda el: el[0], stream1, asiter=False)\n", " return list(stream2) if one else first(stream2)\n", "\n", "%timeit -n 1000 test2(big_ens)"]}, {"cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["720 \u00b5s \u00b1 31.4 \u00b5s per loop (mean \u00b1 std. dev. of 7 runs, 1000 loops each)\n"]}], "source": ["%timeit -n 1000 test2(big_ens, one=True)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["C'est plus court mais pas significativement plus court. Cela correspond au co\u00fbt d'un tri de l'ensemble des observations et du co\u00fbt de la construction du premier groupe."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Reducer et tri\n", "\n", "Un stream est infini en th\u00e9orie. En pratique il est fini mais on ne sait pas si un ou plusieurs groupes entiers tiendraient en m\u00e9moire. Une fa\u00e7on de faire est de limiter la pr\u00e9sence des donn\u00e9es en m\u00e9moire \u00e0 un seul groupe et pour cela, il faut d'abord trier les donn\u00e9es selon les cl\u00e9s. Ce n'est pas indispensable mais dans le pire des cas, c'est une bonne option. On pourrait avoir un stream comme suit :"]}, {"cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 0),\n", " ('b', 1),\n", " ('c', 2),\n", " ('d', 3),\n", " ('e', 4),\n", " ('f', 5),\n", " ('g', 6),\n", " ('h', 7),\n", " ('g', 8),\n", " ('f', 9),\n", " ('e', 10),\n", " ('d', 11),\n", " ('c', 12),\n", " ('b', 13),\n", " ('a', 14)]"]}, "execution_count": 14, "metadata": {}, "output_type": "execute_result"}], "source": ["pas_cool = [(chr(int(c) + 96), i) for i, c in enumerate(str(11111111 ** 2))]\n", "pas_cool"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Le groupe *a* est au d\u00e9but et \u00e0 la fin, si on regroupe en m\u00e9moire, le groupe associ\u00e9 \u00e0 *a* doit rester en m\u00e9moire du d\u00e9but \u00e0 la fin. On ne sait jamais si un groupe ne va pas r\u00e9appara\u00eetre plus tard. En triant, on est s\u00fbr."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Un autre map\n", "\n", "On ajoute un dernier map qui fait la somme des \u00e9l\u00e9ments de chaque groupe."]}, {"cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [{"data": {"text/plain": [""]}, "execution_count": 15, "metadata": {}, "output_type": "execute_result"}], "source": ["def sum_gr(key_gr):\n", " key, gr = key_gr\n", " return key, sum(e[1] for e in gr)\n", "\n", "stream1 = mapper(lambda el: (el[0], el[1]+1), ens)\n", "stream2 = reducer(lambda el: el[0], stream1)\n", "stream3 = map(sum_gr, stream2)\n", "stream3"]}, {"cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 13), ('b', 5)]"]}, "execution_count": 16, "metadata": {}, "output_type": "execute_result"}], "source": ["list(stream3)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Combiner ou join\n", "\n", "Un *combiner* ou *join* permet de fusionner deux bases de donn\u00e9es qui ont en commun une cl\u00e9."]}, {"cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [{"data": {"text/plain": [""]}, "execution_count": 17, "metadata": {}, "output_type": "execute_result"}], "source": ["from sparkouille.fctmr import combiner\n", "stream1 = mapper(lambda el: (el[0], el[1]+1), ens)\n", "stream2 = reducer(lambda el: el[0], stream1)\n", "stream3 = map(sum_gr, stream2)\n", "stream4 = mapper(lambda el: (el[0], el[1]+10), pas_cool)\n", "comb = combiner(lambda el: el[0], stream3, lambda el: el[0], stream4)\n", "comb"]}, {"cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [{"data": {"text/plain": ["[(('a', 13), ('a', 10)),\n", " (('a', 13), ('a', 24)),\n", " (('b', 5), ('b', 11)),\n", " (('b', 5), ('b', 23))]"]}, "execution_count": 18, "metadata": {}, "output_type": "execute_result"}], "source": ["list(comb)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Le co\u00fbt du premier \u00e9l\u00e9ment est un peu plus compliqu\u00e9 \u00e0 inf\u00e9rer, cela d\u00e9pend beaucoup des donn\u00e9es."]}, {"cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["2.97 \u00b5s \u00b1 793 ns per loop (mean \u00b1 std. dev. of 7 runs, 1000 loops each)\n"]}], "source": ["def job(ens, ens2, one=False, sens=True):\n", " stream1 = mapper(lambda el: (el[0], el[1]+1), ens)\n", " stream2 = reducer(lambda el: el[0], stream1)\n", " stream3 = map(sum_gr, stream2)\n", " stream4 = mapper(lambda el: (el[0], el[1]+10), ens2)\n", " if sens:\n", " comb = combiner(lambda el: el[0], stream3, lambda el: el[0], stream4)\n", " else:\n", " comb = combiner(lambda el: el[0], stream4, lambda el: el[0], stream3)\n", " return list(comb) if one else first(comb)\n", "\n", "%timeit -n 1000 job(big_ens, pas_cool)"]}, {"cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["3.15 \u00b5s \u00b1 1.16 \u00b5s per loop (mean \u00b1 std. dev. of 7 runs, 1000 loops each)\n"]}], "source": ["%timeit -n 1000 job(big_ens, pas_cool, sens=False)"]}, {"cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["401 \u00b5s \u00b1 6.73 \u00b5s per loop (mean \u00b1 std. dev. of 7 runs, 1000 loops each)\n"]}], "source": ["%timeit -n 1000 job(big_ens, pas_cool, one=True)"]}, {"cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["389 \u00b5s \u00b1 10.7 \u00b5s per loop (mean \u00b1 std. dev. of 7 runs, 1000 loops each)\n"]}], "source": ["%timeit -n 1000 job(big_ens, pas_cool, one=True, sens=False)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Il y a diff\u00e9rentes fa\u00e7ons de coder un *combiner*, l'une d'elle consiste \u00e0 r\u00e9duire chacun des deux streams puis \u00e0 faire le produit crois\u00e9 de chaque groupe assembl\u00e9."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Reducers r\u00e9cursifs\n", "\n", "C'est pas loin d'\u00eatre un abus de langage, disons que cela r\u00e9duit la d\u00e9pendance au tri. Un exemple."]}, {"cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 10), ('b', 4)]"]}, "execution_count": 23, "metadata": {}, "output_type": "execute_result"}], "source": ["def sum_gr(key_gr):\n", " key, gr = key_gr\n", " return key, sum(e[1] for e in gr)\n", "\n", "def job_recursif(ens):\n", " stream2 = reducer(lambda el: el[0], ens)\n", " stream3 = map(sum_gr, stream2)\n", " return list(stream3)\n", "\n", "job_recursif(ens)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Et maintenant, on coupe en deux :"]}, {"cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 1), ('b', 4)]"]}, "execution_count": 24, "metadata": {}, "output_type": "execute_result"}], "source": ["n = len(ens) // 2\n", "job_recursif(ens[:n])"]}, {"cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 9)]"]}, "execution_count": 25, "metadata": {}, "output_type": "execute_result"}], "source": ["job_recursif(ens[n:])"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Et maintenant :"]}, {"cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 10), ('b', 4)]"]}, "execution_count": 26, "metadata": {}, "output_type": "execute_result"}], "source": ["job_recursif( job_recursif(ens[:n]) + job_recursif(ens[n:]))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Le job ainsi \u00e9crit est associatif en quelque sorte. Cela laisse plus de libert\u00e9 pour la distribution car on peut maintenant distribuer des cl\u00e9s identiques sur des machines diff\u00e9rentes puis r\u00e9appliquer le *reducer* sur les r\u00e9sultats de la premi\u00e8re salve. C'est d'autant plus efficace que le *reducer* r\u00e9duit beaucoup les donn\u00e9es. Il reste \u00e0 voir le cas d'un *reducer* **non r\u00e9cursif**. "]}, {"cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 3.3333333333333335), ('b', 4.0)]"]}, "execution_count": 27, "metadata": {}, "output_type": "execute_result"}], "source": ["def mean(ens):\n", " s = 0.\n", " for i, e in enumerate(ens):\n", " s += e\n", " return s / (i + 1)\n", "\n", "def mean_gr(key_gr):\n", " key, gr = key_gr\n", " return key, mean(e[1] for e in gr)\n", "\n", "def job_non_recursif(ens):\n", " stream2 = reducer(lambda el: el[0], ens)\n", " stream3 = map(mean_gr, stream2)\n", " return list(stream3)\n", "\n", "job_non_recursif(ens)"]}, {"cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 1.0), ('b', 4.0)]"]}, "execution_count": 28, "metadata": {}, "output_type": "execute_result"}], "source": ["n = len(ens) // 2\n", "job_non_recursif(ens[:n])"]}, {"cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 4.5)]"]}, "execution_count": 29, "metadata": {}, "output_type": "execute_result"}], "source": ["job_non_recursif(ens[n:])"]}, {"cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('a', 2.75), ('b', 4.0)]"]}, "execution_count": 30, "metadata": {}, "output_type": "execute_result"}], "source": ["job_non_recursif( job_non_recursif(ens[:n]) + job_non_recursif(ens[n:]))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Ce *job* ne doit pas \u00eatre distribu\u00e9 n'importe comment."]}, {"cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": []}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4"}}, "nbformat": 4, "nbformat_minor": 2}