{"cells": [{"cell_type": "markdown", "metadata": {}, "source": ["# Reservoir Sampling distribu\u00e9 - \u00e9nonc\u00e9\n", "\n", "[Reservoir Sampling](https://en.wikipedia.org/wiki/Reservoir_sampling) sur Map/Reduce. Cet algorithme est un algorithme d'\u00e9chantillonnage en streaming."]}, {"cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [{"data": {"text/html": ["
run previous cell, wait for 2 seconds
\n", ""], "text/plain": [""]}, "execution_count": 2, "metadata": {}, "output_type": "execute_result"}], "source": ["from jyquickhelper import add_notebook_menu\n", "add_notebook_menu()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## sampling sur Hadoop / PIG\n", "\n", "Voir [SAMPLE](http://pig.apache.org/docs/r0.17.0/basic.html#sample) pour *PIG*, [takeSample](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions) sur *Spark*, ou [sample](https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrame.html#sample(boolean,%20double). En pratique, il difficile d'\u00e9crire un algorithme plus efficace \u00e0 partir d'un langage haut niveau si ce langage haut niveau impl\u00e9mente lui-m\u00eame la fonctionnalit\u00e9. C'est une fa\u00e7on de reconna\u00eetre que c'est un besoin fr\u00e9quent et qu'il est recommand\u00e9 d'utiliser la fonction native."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Reservoir Sampling\n", "\n", "Le [reservoir sampling](https://en.wikipedia.org/wiki/Reservoir_sampling) est une fa\u00e7on de tirer un \u00e9chantillon al\u00e9atoire de $k$ nombres parmi $N$ nombres qui ne n\u00e9cessite pas de conserver ces $N$ nombres en m\u00e9moire."]}, {"cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [{"data": {"text/plain": ["['0a', '1b', '2c', '3d', '4e']"]}, "execution_count": 3, "metadata": {}, "output_type": "execute_result"}], "source": ["ensemble = [ \"%d%s\" % (i, chr(i%26 + 97)) for i in range(0,10000)]\n", "ensemble[:5]"]}, {"cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [{"data": {"text/plain": ["['7629l',\n", " '6747n',\n", " '1035v',\n", " '7538y',\n", " '1113v',\n", " '7565z',\n", " '2258w',\n", " '5101f',\n", " '1235n',\n", " '9965h']"]}, "execution_count": 4, "metadata": {}, "output_type": "execute_result"}], "source": ["import random\n", "\n", "def reservoir_sampling(ensemble, k):\n", " N = len(ensemble)\n", " echantillon = []\n", " for i, e in enumerate(ensemble):\n", " if len(echantillon) < k:\n", " echantillon.append(e)\n", " else:\n", " j = random.randint(0, i-1)\n", " if j < k:\n", " echantillon[j] = e\n", " return echantillon\n", "\n", "reservoir_sampling(ensemble, 10)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Le *reservoir sampling* reproduit un tirage sans remise. L'algorithme habituel - on tire $k$ \u00e9l\u00e9ments parmi $N$, la probabilit\u00e9 qu'un \u00e9l\u00e9ment fasse partie de l'\u00e9chantillon est de :\n", "\n", "$$p = \\frac{C_{N-1}^{k-1}}{C_N^k} = \\frac{ \\frac{N-1!}{k-1! N-k!} } { \\frac{N!}{k! N-k!} } = \\frac{ N-1! k! } { N! k-1!} = \\frac{k}{N}$$\n", "\n", "Dans le cas du *reservoir sampling*, on doit s'assurer que chaque \u00e9l\u00e9ment \u00e0 la m\u00eame probabilit\u00e9 de faire partie de l'\u00e9chantillon. On proc\u00e8de par r\u00e9currence. Le r\u00e9sultat est vrai pour $k=N$. On suppose que est vrai \u00e0 l'ordre $N-1$. La probabilit\u00e9 qu'un \u00e9l\u00e9ment de l'ensemble fasse partie de l'\u00e9chantillon est $\\frac{k}{N-1}$. A l'ordre $N$, il y a une probabilit\u00e9 $\\frac{k}{N}$ de remplacer un des \u00e9l\u00e9ments de l'\u00e9chantillon. L'\u00e9l\u00e9ment $N$ a donc une probabilit\u00e9 $\\frac{k}{N}$ de faire partie de l'\u00e9chantillon. Pour les \u00e9l\u00e9ments faisant d\u00e9j\u00e0 partie de l'\u00e9chantillon, leur probabilit\u00e9 de rester est $\\frac{N-k}{N} + \\frac{k}{N}\\frac{k-1}{k}=\\frac{N-1}{N}$. La probabilit\u00e9 qu'un \u00e9l\u00e9ment pr\u00e9sent dans l'\u00e9chantillon soit dans l'\u00e9chantillon \u00e0 l'it\u00e9ration $N$ est de $\\frac{N-1}{N} \\frac{k}{N-1}=\\frac{k}{N}$. L'\u00e9chantillon produit par le *reservoir sampling* a les m\u00eames propri\u00e9t\u00e9s qu'un \u00e9chantillon produit avec un tirage sans remise.\n", "\n", "Cet algorithme est int\u00e9ressent lorsqu'on veut \u00e9chantillonner sur une grande base de donn\u00e9es car il n\u00e9cessite de ne garder en m\u00e9moire que l'\u00e9chantillon."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Reservoir Sampling Distribu\u00e9\n", "\n", "Distribuer l'algorithme para\u00eet simple : une partie des donn\u00e9es ira sur une machine qui en tirera un \u00e9chantillon, l'autre machine tirera un \u00e9chantillon al\u00e9atoire sur l'autre partie des donn\u00e9es. Par extension, sur $m$ machines, on obtient $m$ \u00e9chantillons de tailles $k_1, ..., k_m$ tir\u00e9s sur des ensembles de tailles $N_1, ..., N_m$. Il faut s'assurer que la version distribu\u00e9e produit un \u00e9chantillon avec les m\u00eames propri\u00e9t\u00e9s."]}, {"cell_type": "markdown", "metadata": {}, "source": ["### exercice 1 : combinaison\n", "\n", "Comment recombiner ces \u00e9chantillons al\u00e9atoires ? Comment choisir les $k_i$ sachant qu'on doit tirer un \u00e9chantillon de taille $k \\leqslant \\sum_{i=1}^m k_i$ parmi $N=\\sum_{i=1}^m N_i$ ?"]}, {"cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": []}, {"cell_type": "markdown", "metadata": {}, "source": ["### exercice 2 : script PIG, Spark\n", "\n", "Ecrire le script PIG correspondant \u00e0 l'algorithme."]}, {"cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": []}, {"cell_type": "markdown", "metadata": {}, "source": ["## Petit probl\u00e8me th\u00e9orique\n", "\n", "Un [G\u00e9n\u00e9rateur congruentiel lin\u00e9aire](https://fr.wikipedia.org/wiki/G%C3%A9n%C3%A9rateur_congruentiel_lin%C3%A9aire) imite seulement le hasard, ce n'est pas vraiment le hasard. La suite d\u00e9pend d'une graine ou *seed*. Si deux machines partent de la m\u00eame graine, elle produiront la m\u00eame suite. Qu'en est-il du hasard ? La solution de l'exercice pr\u00e9c\u00e9dent est-elle correcte d'un point de statistique ?"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Reservoir Sampling pond\u00e9r\u00e9\n", "\n", "On serait tenter d'\u00e9crire... On suppose qu'on dispose un ensemble de points $(X_i)$ pond\u00e9r\u00e9 par $\\omega_i \\in \\mathbb{R}$. On veut tirer un \u00e9chantillon de $k$ \u00e9l\u00e9ments. On proc\u00e8de de la m\u00eame mani\u00e8re que pour le *reservoir sampling* non pond\u00e9r\u00e9. A l'\u00e9tape $n-1$, on dispose d'un \u00e9chantillon $E_{n-1}=(X_{i_1}, ..., X_{i_k})$. On tire un nombre $u$ selon une loi uniforme $[0,1]$. On ajoute l'\u00e9l\u00e9ment $X_n$ si :\n", "\n", "$$u \\leqslant \\frac{k\\omega_n}{\\sum_{l=1}^k w_{i_l}}$$\n", "\n", "On veut montrer que pour tout $j \\leqslant n$ :\n", "\n", "$$\\mathbb{P}(X_j \\in E_n) = \\frac{k\\omega_j}{\\sum_{l=1}^n w_l}$$\n", "\n", "On proc\u00e8de par r\u00e9currence en supposant cela vrai \u00e0 l'ordre $n-1$, donc $\\mathbb{P}(X_j \\in E_{n-1}) = \\frac{k\\omega_j}{\\sum_{l=1}^k w_j}$. On v\u00e9rifie que cela est vrai pour $n = k$. Mais a priori, il n'y a aucune raison que cela soit vrai.\n", "\n", "**Il faut faire autrement.**"]}, {"cell_type": "markdown", "metadata": {}, "source": ["On utilise l'algorithme [A-Res](https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res). Soit $E_{n-1}=(X_{i_1}, ..., X_{i_k})$ pond\u00e9r\u00e9 par $(r_1, ..., r_k)$ qui sont diff\u00e9rents des $(\\omega_{i_1}, ..., \\omega_{i_k})$. On consid\u00e8re l'\u00e9l\u00e9ment $X_n$ pond\u00e9r\u00e9 par $\\omega_n$. On l'ajoute \u00e0 $E_{n-1}$ si :\n", "\n", "$$\\min_{1 \\leqslant l \\leqslant k}r_l \\leqslant u^{\\frac{1}{\\omega_n}}$$\n", "\n", "Si la condition est v\u00e9rifi\u00e9, on enl\u00e8ve l'observation $X_{l^*}$ associ\u00e9 au minimum $r_{l^*} =\\min_{1 \\leqslant l \\leqslant k}r_l$ et on le remplace par le point $X_n$ pond\u00e9r\u00e9 par $u^{\\frac{1}{\\omega_n}}$.\n", "\n", "Dans le cas uniforme, $\\omega_n=1$. Si on consid\u00e8re tous les nombres al\u00e9atoires $u_i$, et qu'on les trie par ordre croissant $(u_{\\sigma(1)}, ...,u_{\\sigma(n)})$. Dans ce cas, $\\min_{1 \\leqslant l \\leqslant k}r_l = u_{\\sigma(n-k+1)}$. Comme les nombres $u_i$ sont tir\u00e9s selon une loi uniforme, $\\mathbb{P}(\\min_{1 \\leqslant l \\leqslant k}r_l \\leqslant u) = \\frac{k}{n}$. On retrouve l'algorithme pr\u00e9c\u00e9dent. Cette probabilit\u00e9 ne change pas lorsque $\\omega_n=C \\; \\forall n$ car les nombres $u_i$ sont identiquement distribu\u00e9s.\n", "\n", "On s'int\u00e9resse maintenant au cas o\u00f9 les poids ne sont pas identiques. On associe \u00e0 chaque \u00e9l\u00e9ment $X_i$ de poids $\\omega_i$ un nombre $r_i=u^{\\frac{1}{\\omega_i}}$ o\u00f9 $u$ un nombre al\u00e9atoire appartient tir\u00e9 selon une loi uniforme. On trie ces $r_i$ par ordre croissant : $(r_{\\sigma(1)}, ...,r_{\\sigma(n)})$. A l'it\u00e9ration $n$, l'\u00e9chantillon est $X_{\\sigma(n-k+1)}, ..., X_{\\sigma(n)}$.\n", "\n", "\n", "La fonction de r\u00e9partition d'une variable al\u00e9atoire est d\u00e9finie par $F(x) = \\mathbb{P}(X \\leqslant x)$. On suppose que $X\\geqslant 0$, la loi de $X^\\alpha$ est $F(X^\\alpha) = \\mathbb{P}(X^\\alpha \\leqslant x) = \\mathbb{P}(X \\leqslant x^\\frac{1}{\\alpha}) = F(x^\\frac{1}{\\alpha})$.\n", "\n"]}, {"cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": []}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.0"}}, "nbformat": 4, "nbformat_minor": 2}