{"cells": [{"cell_type": "markdown", "metadata": {}, "source": ["# Premiers pas avec Spark\n", "\n", "Introduction \u00e0 [Spark](https://spark.apache.org/) et aux [RDD](https://www.cs.cmu.edu/~pavlo/courses/fall2013/static/slides/spark.pdf)."]}, {"cell_type": "code", "execution_count": 1, "metadata": {"collapsed": true}, "outputs": [], "source": ["%matplotlib inline"]}, {"cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [{"data": {"text/html": ["
run previous cell, wait for 2 seconds
\n", ""], "text/plain": [""]}, "execution_count": 3, "metadata": {}, "output_type": "execute_result"}], "source": ["from jyquickhelper import add_notebook_menu\n", "add_notebook_menu()"]}, {"cell_type": "markdown", "metadata": {"collapsed": true}, "source": ["## Deux ou trois petites choses \u00e0 ne pas oublier"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Local et cluster\n", "\n", "[Spark](http://spark.apache.org/) n'est pas un langage de programmation mais un environnement de calcul distribu\u00e9. L'installation en locale reproduit ce que [Spark](http://spark.apache.org/) donnerait \u00e0 grande \u00e9chelle sur un cluster mais ce n'est pas rigoureusement identique. En particulier cela veut dire que si votre script tourne en local sur un petit jeu de donn\u00e9es, il est possible qu'il \u00e9choue sur le cluster :\n", "\n", "* Les d\u00e9pendances du script sont install\u00e9es en local mais pas sur chaque machine du cluster [Spark](http://spark.apache.org/). Cela peut se faire \u00e0 l'installation du cluster pour des d\u00e9pendances cons\u00e9quentes ou juste avant l'ex\u00e9cution d'un *job* pour des d\u00e9pendances ponctuelles.\n", "* Les donn\u00e9es sur le cluster sont en plus grand nombre, il est fort probable que l'\u00e9chantillon al\u00e9atoire local ne soit pas repr\u00e9sentatif.\n", "* Les chemins locaux ne fonctionnent pas sur le cluster. Il faudra d'abord uploader les donn\u00e9es sur le cluster pour faire tourner le script.\n", "* D\u00e9bugger est compliqu\u00e9 : les print ne marchent pas souvent, surtout si c'est en distribu\u00e9. Le print va s'ex\u00e9cuter sur une machine distance qui est \u00e0 mille lieues de votre \u00e9cran.\n", "\n", "Quand \u00e7a plante sur une machine distante, il faut s'accrocher. Le pire, c'est quand l'erreur arrive pour une observation toute bizarre apr\u00e8s cinq heures de calcul. Si le message d'erreur n'est pas trop incompr\u00e9hensible, on sen tire. En fait, le plus aga\u00e7ant, c'est quand le calcul est carr\u00e9ment interrompu par le cluster au bout de cinq heures car il d\u00e9cr\u00e8te que les probabilit\u00e9s d'aboutir sont quasi nulles. L\u00e0, on conna\u00eet l'erreur (skewed dataset) et on sait qu'on va souffrir pour construire la contournante."]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Spark et RDD\n", "\n", "[Spark](http://spark.apache.org/) ne manipule pas des fichiers mais des [Resilient Distributed Dataset](http://www-bcf.usc.edu/~minlanyu/teach/csci599-fall12/papers/nsdi_spark.pdf) ou *RDD*. En particulier :\n", "\n", "1. Les *RDD* sont organis\u00e9s en ligne : ce sont des blocs qui ne seront jamais *cass\u00e9s* ni *modifi\u00e9*. Ces lignes ne peuvent pas exc\u00e9der 2 Go (voir [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235)) mais il est conseill\u00e9 de ne pas aller au-del\u00e0 de quelques Mo.\n", "2. Sauf exception, il est impossible d'acc\u00e9der \u00e0 une partie du fichier. Il faut le parcourir en entier (il n'y a pas d'index).\n", "3. Les *RDD* fonctionnent comme des *flux* ou *stream*. On peut soit les lire, soit les \u00e9crire mais jamais les deux en m\u00eame temps. Par cons\u00e9quent, on ne peut pas modifier un *RDD*, il faut toujours en cr\u00e9er un autre.\n", "4. Les *RDD* sont distribu\u00e9s. L'ordre des lignes qui le composent n'est pas pr\u00e9visible.\n", "5. Comme l'ordre est impr\u00e9visible, on ne stocke **jamais** les noms des colonnes dans les *RDD*."]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Les partitions\n", "\n", "Il existe une exception au point 2 : les [partitions](http://dev.sortable.com/spark-repartition/). Une partition est un ensemble de lignes trait\u00e9es par le m\u00eame processus. La parall\u00e9lisation ne peut exc\u00e9der le nombre de partitions. Par d\u00e9faut, c'est al\u00e9atoire (hash hash). Mais on peut tout-\u00e0-fait partionner selon une colonne, deux colonnes. D'ailleurs, c'est l\u00e0-dessus qu'on joue pour optimiser la distribution. Si on r\u00e9duit (ou grouper) selon une colonne, c'est d'autant plus rapide si le stream est d\u00e9j\u00e0 partitionn\u00e9e sur cette colonne."]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Spark et Python\n", "\n", "Spark est impl\u00e9ment\u00e9 en Java. L'API [Python](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) permet de faire beaucoup de choses mais :\n", "\n", "* Elle ne sera jamais aussi compl\u00e8te que l'API [Java](http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html).\n", "* Elle sera plus lente que l'API [Java](http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html) ou [Scala](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) (car [Scala](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) est une surcouche fonctionnelle de [Java](http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html)).\n", "\n"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Librairies sur Spark\n", "\n", "Un des succ\u00e8s de Spark est de proposer des extensions d\u00e9di\u00e9es \u00e0 certains usages comme [MLlib](http://spark.apache.org/mllib/) qui impl\u00e9mente des algorihmes de machine learning distribu\u00e9s, [GraphX](http://spark.apache.org/graphx/) pour des algorithmes sur des graphes. [MLlib](http://spark.apache.org/mllib/) sera bient\u00f4t remplac\u00e9 par ML qui s'appuie sur les [DataFrame](http://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)."]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Erreur : Cannot run program \"python\"\n", "\n", "Il vous manque probablement ``PYSPARK_PYTHON``."]}, {"cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["LOCAL_PYSPARK = c:\\rdupre\\spark-2.2.0-bin-hadoop2.7\n", "PYSPARK_DRIVER_PYTHON = jupyter-notebook\n", "PYSPARK_PYTHON = c:\\Python36_x64\\python\n", "PYSPARK_SUBMIT_ARGS = \"--name\" \"PySparkShell\" \"pyspark-shell\" \n", "SPARK_CMD = set PYSPARK_SUBMIT_ARGS=\"--name\" \"PySparkShell\" \"pyspark-shell\" && jupyter-notebook \n", "SPARK_ENV_LOADED = 1\n", "SPARK_HIVE = true\n", "SPARK_HOME = c:\\rdupre\\spark-2.2.0-bin-hadoop2.7\\bin\\..\n", "SPARK_JARS_DIR = \"c:\\rdupre\\spark-2.2.0-bin-hadoop2.7\\bin\\..\\jars\"\n", "SPARK_SCALA_VERSION = 2.10\n", "_SPARK_CMD_USAGE = Usage: bin\\pyspark.cmd [options]\n"]}], "source": ["import os\n", "for o, v in sorted(os.environ.items()):\n", " if \"SPARK\" in o.upper():\n", " print(\"{0:25}= {1}\".format(o, v.replace(os.environ[\"USERNAME\"], \"\")))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Erreur : Output directory file:/... already exists\n", "\n", "Spark n'aime pas \u00e9crire des donn\u00e9es dans un RDD qui existe d\u00e9j\u00e0. Il faut le supprimer. Tout d\u00e9pend de l'environnement o\u00f9 on se trouve, sur Hadoop ou en local. Comme c'est en local, nous ferons :"]}, {"cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [{"data": {"text/plain": ["[]"]}, "execution_count": 5, "metadata": {}, "output_type": "execute_result"}], "source": ["from pyquickhelper.filehelper import remove_folder\n", "def clean(folder):\n", " if os.path.exists(folder):\n", " return remove_folder(folder)\n", " else:\n", " return []\n", "clean(\"fichier.out.txt\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### V\u00e9rifier que Spark en local fonctionne\n", "\n", "On essaye le *\"hello world\"* en *Spark* qui consiste \u00e0 compter les mots dans un fichier. On prend le fichier du notebook."]}, {"cell_type": "code", "execution_count": 5, "metadata": {"scrolled": false}, "outputs": [], "source": ["text_file = sc.textFile(\"spark_first_steps.ipynb\")\n", "counts = text_file.flatMap(lambda line: line.split(\" \")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)\n", "counts.saveAsTextFile(\"fichier.out.txt\")"]}, {"cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [{"data": {"text/plain": ["['.part-00000.crc',\n", " '.part-00001.crc',\n", " '._SUCCESS.crc',\n", " 'part-00000',\n", " 'part-00001',\n", " '_SUCCESS']"]}, "execution_count": 7, "metadata": {}, "output_type": "execute_result"}], "source": ["os.listdir(\"fichier.out.txt/\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Sortie en plusieurs fichiers\n", "\n", "Un *job* *Spark* est distribu\u00e9. La sortie d'un *job* *Spark* s'effectue sous la forme de plusieurs stream dans un r\u00e9pertoire, un stream par processus. Cela explique la pr\u00e9sence de *part-00000*, *part-00001*. Le fichier ``_SUCCESS`` indique le statut du job."]}, {"cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "('', 11686)\n", "('[collect](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect)', 1)\n", "('SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html)\\\\n\",', 1)\n", "\n", "
"], "text/plain": [""]}, "execution_count": 8, "metadata": {}, "output_type": "execute_result"}], "source": ["%load_ext pyensae\n", "%head fichier.out.txt/part-00000 -n 3"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Le format d\u00e9pend du dernier r\u00e9sultat."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Les op\u00e9rations de bases\n", "\n", "Documentation : [programming-guide.html - transformations](http://spark.apache.org/docs/latest/programming-guide.html#transformations).\n", "\n", "Dans cette section, on consid\u00e8re les donn\u00e9es comme un ensemble de lignes de texte. Rien de plus. Donc, pas d'information de type, des conversions quasiment tout le temps. Bref, c'est utile pour comprendre. On y revient quand le reste ne marche pas. En g\u00e9n\u00e9ral, on commence par [Spark SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html). Ah oui j'oubliais, on s'en sert beaucoup quand les donn\u00e9es ne sont pas structur\u00e9es et sont d\u00e9crites par du JSON, genre des logs d'un site internet. Chaque ligne est en fait un gros JSON.\n", "\n", "On utilise un jeu de donn\u00e9es de machine learning [Adult](https://archive.ics.uci.edu/ml/datasets/Adult) l\u00e9g\u00e8rement pr\u00e9-trait\u00e9s et que vous pourrez trouver sur GitHub : [td3a_spark](https://github.com/sdpython/ensae_teaching_cs/tree/master/_doc/notebooks/td3a_spark)."]}, {"cell_type": "code", "execution_count": 8, "metadata": {"collapsed": true}, "outputs": [], "source": ["import os\n", "if not os.path.exists(\"data_adult.txt\"):\n", " from pyquickhelper.filehelper import unzip_files\n", " unzip_files(\"data_adult.zip\", where_to=\".\")\n", "assert os.path.exists(\"data_adult.txt\")"]}, {"cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ageworkclassfnlwgteducationeducation_nummarital_statusoccupationrelationshipracesexcapital_gaincapital_losshours_per_weeknative_countrytarget
039State-gov77516Bachelors13Never-marriedAdm-clericalNot-in-familyWhiteMale2174040United-States<=50K
150Self-emp-not-inc83311Bachelors13Married-civ-spouseExec-managerialHusbandWhiteMale0013United-States<=50K
238Private215646HS-grad9DivorcedHandlers-cleanersNot-in-familyWhiteMale0040United-States<=50K
353Private23472111th7Married-civ-spouseHandlers-cleanersHusbandBlackMale0040United-States<=50K
428Private338409Bachelors13Married-civ-spouseProf-specialtyWifeBlackFemale0040Cuba<=50K
\n", "
"], "text/plain": [" age workclass fnlwgt education education_num \\\n", "0 39 State-gov 77516 Bachelors 13 \n", "1 50 Self-emp-not-inc 83311 Bachelors 13 \n", "2 38 Private 215646 HS-grad 9 \n", "3 53 Private 234721 11th 7 \n", "4 28 Private 338409 Bachelors 13 \n", "\n", " marital_status occupation relationship race sex \\\n", "0 Never-married Adm-clerical Not-in-family White Male \n", "1 Married-civ-spouse Exec-managerial Husband White Male \n", "2 Divorced Handlers-cleaners Not-in-family White Male \n", "3 Married-civ-spouse Handlers-cleaners Husband Black Male \n", "4 Married-civ-spouse Prof-specialty Wife Black Female \n", "\n", " capital_gain capital_loss hours_per_week native_country target \n", "0 2174 0 40 United-States <=50K \n", "1 0 0 13 United-States <=50K \n", "2 0 0 40 United-States <=50K \n", "3 0 0 40 United-States <=50K \n", "4 0 0 40 Cuba <=50K "]}, "execution_count": 10, "metadata": {}, "output_type": "execute_result"}], "source": ["import pandas\n", "df = pandas.read_csv(\"data_adult.txt\", sep=\"\\t\", encoding=\"utf-8\")\n", "df.head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["On enl\u00e8ve le nom des colonnes."]}, {"cell_type": "code", "execution_count": 10, "metadata": {"collapsed": true}, "outputs": [], "source": ["df.to_csv(\"adult.txt\", sep=\"\\t\", encoding=\"utf-8\", index=False, header=None)"]}, {"cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "39\t State-gov\t77516\t Bachelors\t13\t Never-married\t Adm-clerical\t Not-in-family\t White\t Male\t2174\t0\t40\t United-States\t <=50K\n", "50\t Self-emp-not-inc\t83311\t Bachelors\t13\t Married-civ-spouse\t Exec-managerial\t Husband\t White\t Male\t0\t0\t13\t United-States\t <=50K\n", "\n", "
"], "text/plain": [""]}, "execution_count": 12, "metadata": {}, "output_type": "execute_result"}], "source": ["%head adult.txt -n 2"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### d\u00e9claration d'un RDD\n", "\n", "La d\u00e9claration d\u00e9clare l'existence d'un *RDD* comme on d\u00e9clare un fichier. Pour l'instant aucune manipulation."]}, {"cell_type": "code", "execution_count": 12, "metadata": {"collapsed": true}, "outputs": [], "source": ["rdd = sc.textFile(\"adult.txt\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### enregistrement d'un RDD"]}, {"cell_type": "code", "execution_count": 13, "metadata": {"collapsed": true}, "outputs": [], "source": ["import os\n", "if not os.path.exists(\"out\"):\n", " os.mkdir(\"out\")"]}, {"cell_type": "code", "execution_count": 14, "metadata": {"collapsed": true, "scrolled": false}, "outputs": [], "source": ["clean(\"out/copy_adult.txt\")\n", "rdd.saveAsTextFile(os.path.abspath(\"out/copy_adult.txt\"))"]}, {"cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "39\t State-gov\t77516\t Bachelors\t13\t Never-married\t Adm-clerical\t Not-in-family\t White\t Male\t2174\t0\t40\t United-States\t <=50K\n", "50\t Self-emp-not-inc\t83311\t Bachelors\t13\t Married-civ-spouse\t Exec-managerial\t Husband\t White\t Male\t0\t0\t13\t United-States\t <=50K\n", "\n", "
"], "text/plain": [""]}, "execution_count": 16, "metadata": {}, "output_type": "execute_result"}], "source": ["%head out/copy_adult.txt/part-00000 -n 2"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### lecture locale d'un RDD avec pandas\n", "\n", "On lit chaque morceaux avant de les concat\u00e9ner."]}, {"cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
01234567891011121314
039State-gov77516Bachelors13Never-marriedAdm-clericalNot-in-familyWhiteMale2174040United-States<=50K
150Self-emp-not-inc83311Bachelors13Married-civ-spouseExec-managerialHusbandWhiteMale0013United-States<=50K
\n", "
"], "text/plain": [" 0 1 2 3 4 5 \\\n", "0 39 State-gov 77516 Bachelors 13 Never-married \n", "1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse \n", "\n", " 6 7 8 9 10 11 12 \\\n", "0 Adm-clerical Not-in-family White Male 2174 0 40 \n", "1 Exec-managerial Husband White Male 0 0 13 \n", "\n", " 13 14 \n", "0 United-States <=50K \n", "1 United-States <=50K "]}, "execution_count": 17, "metadata": {}, "output_type": "execute_result"}], "source": ["import glob\n", "import pandas\n", "def read_rdd(path, **options):\n", " pat = os.path.join(path, \"part*\")\n", " all_files = glob.glob(pat)\n", " if len(all_files) == 0:\n", " raise Exception(\"No file to read in '{0}'\".format(path))\n", " merge = []\n", " for f in all_files:\n", " try:\n", " df = pandas.read_csv(f, header=None, **options)\n", " except Exception as e:\n", " raise Exception(\"Unable to read '{0}'\".format(f)) from e\n", " merge.append(df)\n", " if len(merge) == 0:\n", " raise Exception(\"No file to read in '{0}'\".format(path))\n", " concatenated_df = pandas.concat(merge, ignore_index=True)\n", " return concatenated_df\n", "\n", "data = read_rdd(\"out/copy_adult.txt\", sep=\"\\t\", encoding=\"utf-8\")\n", "data.head(n=2)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### collect\n", "\n", "Cette op\u00e9ration regroupe les deux pr\u00e9c\u00e9dentes en une seule. Il faut toute de m\u00eame faire attention de ne pas l'ex\u00e9cuter sur un grand fichier sous peine de faire exploser la m\u00e9moire."]}, {"cell_type": "code", "execution_count": 17, "metadata": {"collapsed": true}, "outputs": [], "source": ["res = rdd.collect()"]}, {"cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [{"data": {"text/plain": ["['39\\t State-gov\\t77516\\t Bachelors\\t13\\t Never-married\\t Adm-clerical\\t Not-in-family\\t White\\t Male\\t2174\\t0\\t40\\t United-States\\t <=50K',\n", " '50\\t Self-emp-not-inc\\t83311\\t Bachelors\\t13\\t Married-civ-spouse\\t Exec-managerial\\t Husband\\t White\\t Male\\t0\\t0\\t13\\t United-States\\t <=50K']"]}, "execution_count": 19, "metadata": {}, "output_type": "execute_result"}], "source": ["res[:2]"]}, {"cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
01234567891011121314
039State-gov77516Bachelors13Never-marriedAdm-clericalNot-in-familyWhiteMale2174040United-States<=50K
150Self-emp-not-inc83311Bachelors13Married-civ-spouseExec-managerialHusbandWhiteMale0013United-States<=50K
\n", "
"], "text/plain": [" 0 1 2 3 4 5 \\\n", "0 39 State-gov 77516 Bachelors 13 Never-married \n", "1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse \n", "\n", " 6 7 8 9 10 11 12 \\\n", "0 Adm-clerical Not-in-family White Male 2174 0 40 \n", "1 Exec-managerial Husband White Male 0 0 13 \n", "\n", " 13 14 \n", "0 United-States <=50K \n", "1 United-States <=50K "]}, "execution_count": 20, "metadata": {}, "output_type": "execute_result"}], "source": ["import pandas\n", "df = pandas.DataFrame([_.split(\"\\t\") for _ in res])\n", "df.head(2)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### map\n", "\n", "Transformer une ligne en une autre ligne. Chaque ligne est trait\u00e9e ind\u00e9pendemment des autres."]}, {"cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [{"data": {"text/plain": ["[['State-gov', 'Bachelors'], ['Self-emp-not-inc', 'Bachelors']]"]}, "execution_count": 21, "metadata": {}, "output_type": "execute_result"}], "source": ["def extract_column(cols, row):\n", " spl = row.split(\"\\t\")\n", " return [spl[i].strip() for i in cols]\n", "\n", "res = rdd.map(lambda row: extract_column([1,3], row))\n", "res.collect()[:2]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### filter\n", "\n", "Garder ou jeter une ligne. Chaque ligne est trait\u00e9e ind\u00e9pendemment des autres."]}, {"cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [{"data": {"text/plain": ["['52\\t Self-emp-not-inc\\t209642\\t HS-grad\\t9\\t Married-civ-spouse\\t Exec-managerial\\t Husband\\t White\\t Male\\t0\\t0\\t45\\t United-States\\t >50K',\n", " '31\\t Private\\t45781\\t Masters\\t14\\t Never-married\\t Prof-specialty\\t Not-in-family\\t White\\t Female\\t14084\\t0\\t50\\t United-States\\t >50K']"]}, "execution_count": 22, "metadata": {}, "output_type": "execute_result"}], "source": ["def filter_column(row):\n", " spl = row.split(\"\\t\")\n", " return spl[-1].strip() != \"<=50K\"\n", "\n", "res = rdd.filter(lambda row: filter_column(row))\n", "res.collect()[:2]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["On combine souvent les deux :"]}, {"cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [{"data": {"text/plain": ["[['Self-emp-not-inc', 'HS-grad', '>50K'], ['Private', 'Masters', '>50K']]"]}, "execution_count": 23, "metadata": {}, "output_type": "execute_result"}], "source": ["def filter_column_split(row):\n", " return row[-1].strip() != \"<=50K\"\n", "\n", "res = rdd.map(lambda row: extract_column([1,3,-1], row)) \\\n", " .filter(lambda row: filter_column_split(row))\n", "res.collect()[:2]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Il faut faire attention aux transformations successives des lignes."]}, {"cell_type": "markdown", "metadata": {}, "source": ["### flatMap\n", "\n", "C'est la principale diff\u00e9rence avec SQL. Une ligne peut devenir un nombre variable de lignes."]}, {"cell_type": "code", "execution_count": 23, "metadata": {"scrolled": false}, "outputs": [{"data": {"text/plain": ["[('39',\n", " 'State-gov',\n", " '77516',\n", " 'Bachelors',\n", " '13',\n", " 'Never-married',\n", " 'Adm-clerical',\n", " 'Not-in-family',\n", " 'White',\n", " 'Male',\n", " '2174',\n", " '0',\n", " '40',\n", " 'United-States',\n", " '<=50K'),\n", " ('39',\n", " 'State-gov',\n", " '77516',\n", " 'Bachelors',\n", " '13',\n", " 'Never-married',\n", " 'Adm-clerical',\n", " 'Not-in-family',\n", " 'White',\n", " 'Male',\n", " '2174',\n", " '0',\n", " '40',\n", " 'United-States',\n", " '<=50K'),\n", " ('50',\n", " 'Self-emp-not-inc',\n", " '83311',\n", " 'Bachelors',\n", " '13',\n", " 'Married-civ-spouse',\n", " 'Exec-managerial',\n", " 'Husband',\n", " 'White',\n", " 'Male',\n", " '0',\n", " '0',\n", " '13',\n", " 'United-States',\n", " '<=50K')]"]}, "execution_count": 24, "metadata": {}, "output_type": "execute_result"}], "source": ["def extract_column_and_multiply_row(n, row):\n", " spl = row.split(\"\\t\")\n", " return [tuple(_.strip() for _ in spl)] * n\n", "\n", "res = rdd.flatMap(lambda row: extract_column_and_multiply_row(2, row))\n", "res.collect()[:3]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### group / reduce + mapValues\n", "\n", "Petite moyenne ?"]}, {"cell_type": "code", "execution_count": 24, "metadata": {"scrolled": false}, "outputs": [{"data": {"text/plain": ["[('>50K', (7841, 346963.0)), ('<=50K', (24720, 909294.0))]"]}, "execution_count": 25, "metadata": {}, "output_type": "execute_result"}], "source": ["def extract_age_rich(row):\n", " spl = row.split(\"\\t\")\n", " target = spl[-1].strip()\n", " age = float(spl[0])\n", " return (age, target)\n", "\n", "def custom_agg(aggset):\n", " temp = list([_[0] for _ in aggset])\n", " return len(temp), sum(temp)\n", "\n", "ave = rdd.map(extract_age_rich).groupBy(lambda row: row[1]).mapValues(custom_agg)\n", "fin = ave.collect()\n", "fin"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### sort\n", "\n", "Je n'en parle pas. Trier un gros jeu de donn\u00e9es est \u00e0 proscrire. On peut trier au sein d'un groupe mais **jamais** un stream entier. Ca fait presque dix ans que j'\u00e9cris des jobs map/reduce, je n'ai jamais \u00e9crit un *sort* sur tout un jeu de donn\u00e9es. Ca s'appelle flinguer de la CPU pour rien."]}, {"cell_type": "markdown", "metadata": {}, "source": ["### join\n", "\n", "Et on remet la moyenne dans le stream initial. Il vaut mieux regarder la documentation de la m\u00e9thode [join](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.join) avant de commencer \u00e0 lire le code qui suit."]}, {"cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [{"data": {"text/plain": ["[('>50K',\n", " (['52',\n", " ' Self-emp-not-inc',\n", " '209642',\n", " ' HS-grad',\n", " '9',\n", " ' Married-civ-spouse',\n", " ' Exec-managerial',\n", " ' Husband',\n", " ' White',\n", " ' Male',\n", " '0',\n", " '0',\n", " '45',\n", " ' United-States',\n", " ' >50K'],\n", " (7841, 346963.0))),\n", " ('>50K',\n", " (['31',\n", " ' Private',\n", " '45781',\n", " ' Masters',\n", " '14',\n", " ' Never-married',\n", " ' Prof-specialty',\n", " ' Not-in-family',\n", " ' White',\n", " ' Female',\n", " '14084',\n", " '0',\n", " '50',\n", " ' United-States',\n", " ' >50K'],\n", " (7841, 346963.0)))]"]}, "execution_count": 26, "metadata": {}, "output_type": "execute_result"}], "source": ["add_key = rdd.map(lambda row: row.split(\"\\t\")).map(lambda row: (row[-1].strip(), row))\n", "join = add_key.join(ave)\n", "join.collect()[:2]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["On commence \u00e0 comprendre pourquoi [Spark SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html), \u00e7a risque d'\u00eatre pas mal."]}, {"cell_type": "markdown", "metadata": {}, "source": ["### le choix existentiel du join : le petit join\n", "\n", "On fait souvent une op\u00e9ration qui consiste \u00e0 garder les lignes pour lesquelles une certaine valeur appartient \u00e0 un ensemble. On peut faire un join classique ou alors l'ensemble est petit, traiter ce join comme un map. On broadcaste l'ensemble \u00e0 chaque processus ex\u00e9cutant le map."]}, {"cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [{"data": {"text/plain": ["[20, 30, 40]"]}, "execution_count": 27, "metadata": {}, "output_type": "execute_result"}], "source": ["from pyspark.context import SparkContext\n", "ages = sc.broadcast([20, 30, 40])\n", "ages.value"]}, {"cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [{"data": {"text/plain": ["['30\\t State-gov\\t141297\\t Bachelors\\t13\\t Married-civ-spouse\\t Prof-specialty\\t Husband\\t Asian-Pac-Islander\\t Male\\t0\\t0\\t40\\t India\\t >50K',\n", " '40\\t Private\\t121772\\t Assoc-voc\\t11\\t Married-civ-spouse\\t Craft-repair\\t Husband\\t Asian-Pac-Islander\\t Male\\t0\\t0\\t40\\t ?\\t >50K']"]}, "execution_count": 28, "metadata": {}, "output_type": "execute_result"}], "source": ["subset = rdd.filter(lambda row: int(row.split(\"\\t\")[0]) in ages.value )\n", "subset.collect()[:2]"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### les trucs qui servent parfois parce que ... \u00e0 l'usage \u00e7a sert\n", "\n", "Ce que font les m\u00e9thodes associ\u00e9es aux [RDD](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD), un peu comme les it\u00e9rateurs, n'est pas toujours intuitif, mais il est \u00e0 peu pr\u00e8s s\u00fbr qu'elles vous serviront un jour (peut-\u00eatre apr\u00e8s avoir googl\u00e9 ou bing\u00e9 comme des fous)."]}, {"cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [{"data": {"text/plain": ["[2, 3, 4]"]}, "execution_count": 29, "metadata": {}, "output_type": "execute_result"}], "source": ["simple_rdd = sc.parallelize([2, 3, 4])\n", "simple_rdd.collect()"]}, {"cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [{"data": {"text/plain": ["[1, 1, 2, 1, 2, 3]"]}, "execution_count": 30, "metadata": {}, "output_type": "execute_result"}], "source": ["simple_rdd.flatMap(lambda x: range(1, x)).collect()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["[histogram](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.histogram), [groupByKey](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### le truc \u00e0 retenir\n", "\n", "collect, collect... qu'est-ce que je voulais dire d\u00e9j\u00e0... Ah oui... Un job map/reduce c'est :\n", "\n", "1. La d\u00e9claration des flux d'entr\u00e9es.\n", "2. Le traitement \u00e0 proprement parler.\n", "3. La d\u00e9claration des flux de sorties.\n", "\n", "A moins d'\u00e9crire du java bas niveau, le job est transform\u00e9 en un plan d'ex\u00e9cution qui n'est jamais ex\u00e9cut\u00e9 si [collect](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) ou [save machin chouette](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.saveAsTextFile) n'est jamais ex\u00e9cut\u00e9. Bref, c'est du [lazy](https://en.wikipedia.org/wiki/Lazy_evaluation)."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Spark DataFrame\n", "\n", "[Spark SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html)\n", "\n", "Au d\u00e9but, \u00e7a commence par... cr\u00e9er un dataframe. Et comme pour pandas, ces objets retienennt les noms et les types."]}, {"cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ageworkclassfnlwgteducationeducation_nummarital_statusoccupationrelationshipracesexcapital_gaincapital_losshours_per_weeknative_countrytarget
039State-gov77516Bachelors13Never-marriedAdm-clericalNot-in-familyWhiteMale2174040United-States<=50K
150Self-emp-not-inc83311Bachelors13Married-civ-spouseExec-managerialHusbandWhiteMale0013United-States<=50K
\n", "
"], "text/plain": [" age workclass fnlwgt education education_num \\\n", "0 39 State-gov 77516 Bachelors 13 \n", "1 50 Self-emp-not-inc 83311 Bachelors 13 \n", "\n", " marital_status occupation relationship race sex \\\n", "0 Never-married Adm-clerical Not-in-family White Male \n", "1 Married-civ-spouse Exec-managerial Husband White Male \n", "\n", " capital_gain capital_loss hours_per_week native_country target \n", "0 2174 0 40 United-States <=50K \n", "1 0 0 13 United-States <=50K "]}, "execution_count": 31, "metadata": {}, "output_type": "execute_result"}], "source": ["import pandas\n", "data = pandas.read_csv(\"data_adult.txt\", sep=\"\\t\", encoding=\"utf-8\")\n", "data.head(2)"]}, {"cell_type": "code", "execution_count": 31, "metadata": {"collapsed": true}, "outputs": [], "source": ["if \"spark\" not in locals():\n", " from pyspark.sql import SparkSession\n", " spark = SparkSession.builder.appName(\"nimportequoi\").getOrCreate() # \u00e0 ne faire qu'une fois"]}, {"cell_type": "code", "execution_count": 32, "metadata": {"collapsed": true}, "outputs": [], "source": ["# sdf = spark.createDataFrame(data) # \u00e7a marche\n", "sdf = spark.read.csv(\"data_adult.txt\", sep=\"\\t\", encoding=\"utf-8\")"]}, {"cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+\n", "|_c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11| _c12| _c13| _c14|\n", "+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+\n", "|age| workclass|fnlwgt| education|education_num| marital_status| occupation| relationship| race| sex|capital_gain|capital_loss|hours_per_week|native_country|target|\n", "| 39| State-gov| 77516| Bachelors| 13| Never-married| Adm-clerical| Not-in-family| White| Male| 2174| 0| 40| United-States| <=50K|\n", "| 50| Self-emp-not-inc| 83311| Bachelors| 13| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 13| United-States| <=50K|\n", "| 38| Private|215646| HS-grad| 9| Divorced| Handlers-cleaners| Not-in-family| White| Male| 0| 0| 40| United-States| <=50K|\n", "| 53| Private|234721| 11th| 7| Married-civ-spouse| Handlers-cleaners| Husband| Black| Male| 0| 0| 40| United-States| <=50K|\n", "| 28| Private|338409| Bachelors| 13| Married-civ-spouse| Prof-specialty| Wife| Black| Female| 0| 0| 40| Cuba| <=50K|\n", "| 37| Private|284582| Masters| 14| Married-civ-spouse| Exec-managerial| Wife| White| Female| 0| 0| 40| United-States| <=50K|\n", "| 49| Private|160187| 9th| 5| Married-spouse-a...| Other-service| Not-in-family| Black| Female| 0| 0| 16| Jamaica| <=50K|\n", "| 52| Self-emp-not-inc|209642| HS-grad| 9| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 45| United-States| >50K|\n", "| 31| Private| 45781| Masters| 14| Never-married| Prof-specialty| Not-in-family| White| Female| 14084| 0| 50| United-States| >50K|\n", "| 42| Private|159449| Bachelors| 13| Married-civ-spouse| Exec-managerial| Husband| White| Male| 5178| 0| 40| United-States| >50K|\n", "| 37| Private|280464| Some-college| 10| Married-civ-spouse| Exec-managerial| Husband| Black| Male| 0| 0| 80| United-States| >50K|\n", "| 30| State-gov|141297| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| Asian-Pac-Islander| Male| 0| 0| 40| India| >50K|\n", "| 23| Private|122272| Bachelors| 13| Never-married| Adm-clerical| Own-child| White| Female| 0| 0| 30| United-States| <=50K|\n", "| 32| Private|205019| Assoc-acdm| 12| Never-married| Sales| Not-in-family| Black| Male| 0| 0| 50| United-States| <=50K|\n", "| 40| Private|121772| Assoc-voc| 11| Married-civ-spouse| Craft-repair| Husband| Asian-Pac-Islander| Male| 0| 0| 40| ?| >50K|\n", "| 34| Private|245487| 7th-8th| 4| Married-civ-spouse| Transport-moving| Husband| Amer-Indian-Eskimo| Male| 0| 0| 45| Mexico| <=50K|\n", "| 25| Self-emp-not-inc|176756| HS-grad| 9| Never-married| Farming-fishing| Own-child| White| Male| 0| 0| 35| United-States| <=50K|\n", "| 32| Private|186824| HS-grad| 9| Never-married| Machine-op-inspct| Unmarried| White| Male| 0| 0| 40| United-States| <=50K|\n", "| 38| Private| 28887| 11th| 7| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 50| United-States| <=50K|\n", "+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+\n", "only showing top 20 rows\n", "\n"]}], "source": ["sdf.show()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Conversion \u00e0 pandas"]}, {"cell_type": "code", "execution_count": 34, "metadata": {"collapsed": true}, "outputs": [], "source": ["df = sdf.toPandas()"]}, {"cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
_c0_c1_c2_c3_c4_c5_c6_c7_c8_c9_c10_c11_c12_c13_c14
0ageworkclassfnlwgteducationeducation_nummarital_statusoccupationrelationshipracesexcapital_gaincapital_losshours_per_weeknative_countrytarget
139State-gov77516Bachelors13Never-marriedAdm-clericalNot-in-familyWhiteMale2174040United-States<=50K
250Self-emp-not-inc83311Bachelors13Married-civ-spouseExec-managerialHusbandWhiteMale0013United-States<=50K
338Private215646HS-grad9DivorcedHandlers-cleanersNot-in-familyWhiteMale0040United-States<=50K
453Private23472111th7Married-civ-spouseHandlers-cleanersHusbandBlackMale0040United-States<=50K
\n", "
"], "text/plain": [" _c0 _c1 _c2 _c3 _c4 \\\n", "0 age workclass fnlwgt education education_num \n", "1 39 State-gov 77516 Bachelors 13 \n", "2 50 Self-emp-not-inc 83311 Bachelors 13 \n", "3 38 Private 215646 HS-grad 9 \n", "4 53 Private 234721 11th 7 \n", "\n", " _c5 _c6 _c7 _c8 _c9 \\\n", "0 marital_status occupation relationship race sex \n", "1 Never-married Adm-clerical Not-in-family White Male \n", "2 Married-civ-spouse Exec-managerial Husband White Male \n", "3 Divorced Handlers-cleaners Not-in-family White Male \n", "4 Married-civ-spouse Handlers-cleaners Husband Black Male \n", "\n", " _c10 _c11 _c12 _c13 _c14 \n", "0 capital_gain capital_loss hours_per_week native_country target \n", "1 2174 0 40 United-States <=50K \n", "2 0 0 13 United-States <=50K \n", "3 0 0 40 United-States <=50K \n", "4 0 0 40 United-States <=50K "]}, "execution_count": 36, "metadata": {}, "output_type": "execute_result"}], "source": ["df.head()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Retour aux RDD"]}, {"cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [{"data": {"text/plain": ["MapPartitionsRDD[59] at javaToPython at null:-2"]}, "execution_count": 37, "metadata": {}, "output_type": "execute_result"}], "source": ["sdf.rdd"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### R\u00e9cuperer le sch\u00e9ma"]}, {"cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [{"data": {"text/plain": ["StructType(List(StructField(_c0,StringType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true),StructField(_c3,StringType,true),StructField(_c4,StringType,true),StructField(_c5,StringType,true),StructField(_c6,StringType,true),StructField(_c7,StringType,true),StructField(_c8,StringType,true),StructField(_c9,StringType,true),StructField(_c10,StringType,true),StructField(_c11,StringType,true),StructField(_c12,StringType,true),StructField(_c13,StringType,true),StructField(_c14,StringType,true)))"]}, "execution_count": 38, "metadata": {}, "output_type": "execute_result"}], "source": ["sdf.schema"]}, {"cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["root\n", " |-- _c0: string (nullable = true)\n", " |-- _c1: string (nullable = true)\n", " |-- _c2: string (nullable = true)\n", " |-- _c3: string (nullable = true)\n", " |-- _c4: string (nullable = true)\n", " |-- _c5: string (nullable = true)\n", " |-- _c6: string (nullable = true)\n", " |-- _c7: string (nullable = true)\n", " |-- _c8: string (nullable = true)\n", " |-- _c9: string (nullable = true)\n", " |-- _c10: string (nullable = true)\n", " |-- _c11: string (nullable = true)\n", " |-- _c12: string (nullable = true)\n", " |-- _c13: string (nullable = true)\n", " |-- _c14: string (nullable = true)\n", "\n"]}], "source": ["sdf.printSchema()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Utiliser pandas pour sp\u00e9cifer le format\n", "\n", "On utilise pandas sur une partie du stream."]}, {"cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ageworkclassfnlwgteducationeducation_nummarital_statusoccupationrelationshipracesexcapital_gaincapital_losshours_per_weeknative_countrytarget
039State-gov77516Bachelors13Never-marriedAdm-clericalNot-in-familyWhiteMale2174040United-States<=50K
150Self-emp-not-inc83311Bachelors13Married-civ-spouseExec-managerialHusbandWhiteMale0013United-States<=50K
\n", "
"], "text/plain": [" age workclass fnlwgt education education_num \\\n", "0 39 State-gov 77516 Bachelors 13 \n", "1 50 Self-emp-not-inc 83311 Bachelors 13 \n", "\n", " marital_status occupation relationship race sex \\\n", "0 Never-married Adm-clerical Not-in-family White Male \n", "1 Married-civ-spouse Exec-managerial Husband White Male \n", "\n", " capital_gain capital_loss hours_per_week native_country target \n", "0 2174 0 40 United-States <=50K \n", "1 0 0 13 United-States <=50K "]}, "execution_count": 40, "metadata": {}, "output_type": "execute_result"}], "source": ["import pandas\n", "df = pandas.read_csv(\"data_adult.txt\", sep=\"\\t\", encoding=\"utf-8\")\n", "df.head(n=2)"]}, {"cell_type": "code", "execution_count": 40, "metadata": {"collapsed": true}, "outputs": [], "source": ["sdf = spark.createDataFrame(df)"]}, {"cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["root\n", " |-- age: long (nullable = true)\n", " |-- workclass: string (nullable = true)\n", " |-- fnlwgt: long (nullable = true)\n", " |-- education: string (nullable = true)\n", " |-- education_num: long (nullable = true)\n", " |-- marital_status: string (nullable = true)\n", " |-- occupation: string (nullable = true)\n", " |-- relationship: string (nullable = true)\n", " |-- race: string (nullable = true)\n", " |-- sex: string (nullable = true)\n", " |-- capital_gain: long (nullable = true)\n", " |-- capital_loss: long (nullable = true)\n", " |-- hours_per_week: long (nullable = true)\n", " |-- native_country: string (nullable = true)\n", " |-- target: string (nullable = true)\n", "\n"]}], "source": ["sdf.printSchema()"]}, {"cell_type": "code", "execution_count": 42, "metadata": {"collapsed": true}, "outputs": [], "source": ["fullsdf = spark.createDataFrame(sdf.rdd, sdf.schema)"]}, {"cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["root\n", " |-- age: long (nullable = true)\n", " |-- workclass: string (nullable = true)\n", " |-- fnlwgt: long (nullable = true)\n", " |-- education: string (nullable = true)\n", " |-- education_num: long (nullable = true)\n", " |-- marital_status: string (nullable = true)\n", " |-- occupation: string (nullable = true)\n", " |-- relationship: string (nullable = true)\n", " |-- race: string (nullable = true)\n", " |-- sex: string (nullable = true)\n", " |-- capital_gain: long (nullable = true)\n", " |-- capital_loss: long (nullable = true)\n", " |-- hours_per_week: long (nullable = true)\n", " |-- native_country: string (nullable = true)\n", " |-- target: string (nullable = true)\n", "\n"]}], "source": ["fullsdf.printSchema()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Enregistrement au format parquet"]}, {"cell_type": "code", "execution_count": 44, "metadata": {"collapsed": true}, "outputs": [], "source": ["fullsdf.write.parquet(\"data_adult.schema.parquet\")"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Relecture du format parquet"]}, {"cell_type": "code", "execution_count": 45, "metadata": {"collapsed": true}, "outputs": [], "source": ["newsdf = spark.read.parquet(\"data_adult.schema.parquet/\")"]}, {"cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["root\n", " |-- age: long (nullable = true)\n", " |-- workclass: string (nullable = true)\n", " |-- fnlwgt: long (nullable = true)\n", " |-- education: string (nullable = true)\n", " |-- education_num: long (nullable = true)\n", " |-- marital_status: string (nullable = true)\n", " |-- occupation: string (nullable = true)\n", " |-- relationship: string (nullable = true)\n", " |-- race: string (nullable = true)\n", " |-- sex: string (nullable = true)\n", " |-- capital_gain: long (nullable = true)\n", " |-- capital_loss: long (nullable = true)\n", " |-- hours_per_week: long (nullable = true)\n", " |-- native_country: string (nullable = true)\n", " |-- target: string (nullable = true)\n", "\n"]}], "source": ["newsdf.printSchema()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["### Dataframe Spark VS Dataframe pandas\n", "\n", "Spark a reproduit la m\u00eame interface que pandas pour ses dataframes except\u00e9 que le r\u00e9sultat n'est pas calcul\u00e9 tant qu'on ne choisit pas de sauvegarder le r\u00e9sultat."]}, {"cell_type": "code", "execution_count": 47, "metadata": {"collapsed": true}, "outputs": [], "source": ["fifty = fullsdf [fullsdf.age > 50]"]}, {"cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["+---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+\n", "|age| workclass|fnlwgt| education|education_num| marital_status| occupation| relationship| race| sex|capital_gain|capital_loss|hours_per_week|native_country|target|\n", "+---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+\n", "| 53| Private|234721| 11th| 7| Married-civ-spouse| Handlers-cleaners| Husband| Black| Male| 0| 0| 40| United-States| <=50K|\n", "| 52| Self-emp-not-inc|209642| HS-grad| 9| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 45| United-States| >50K|\n", "| 54| Private|302146| HS-grad| 9| Separated| Other-service| Unmarried| Black| Female| 0| 0| 20| United-States| <=50K|\n", "| 59| Private|109015| HS-grad| 9| Divorced| Tech-support| Unmarried| White| Female| 0| 0| 40| United-States| <=50K|\n", "| 56| Local-gov|216851| Bachelors| 13| Married-civ-spouse| Tech-support| Husband| White| Male| 0| 0| 40| United-States| >50K|\n", "| 54| ?|180211| Some-college| 10| Married-civ-spouse| ?| Husband| Asian-Pac-Islander| Male| 0| 0| 60| South| >50K|\n", "| 53| Self-emp-not-inc| 88506| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| White| Male| 0| 0| 40| United-States| <=50K|\n", "| 57| Federal-gov|337895| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| Black| Male| 0| 0| 40| United-States| >50K|\n", "| 53| Private|144361| HS-grad| 9| Married-civ-spouse| Machine-op-inspct| Husband| White| Male| 0| 0| 38| United-States| <=50K|\n", "| 53| Private|169846| HS-grad| 9| Married-civ-spouse| Adm-clerical| Wife| White| Female| 0| 0| 40| United-States| >50K|\n", "| 79| Private|124744| Some-college| 10| Married-civ-spouse| Prof-specialty| Other-relative| White| Male| 0| 0| 20| United-States| <=50K|\n", "| 67| ?|212759| 10th| 6| Married-civ-spouse| ?| Husband| White| Male| 0| 0| 2| United-States| <=50K|\n", "| 52| Private|276515| Bachelors| 13| Married-civ-spouse| Other-service| Husband| White| Male| 0| 0| 40| Cuba| <=50K|\n", "| 59| Private|159937| HS-grad| 9| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 48| United-States| <=50K|\n", "| 53| Private|346253| HS-grad| 9| Divorced| Sales| Own-child| White| Female| 0| 0| 35| United-States| <=50K|\n", "| 57| Private|249977| Assoc-voc| 11| Married-civ-spouse| Prof-specialty| Husband| White| Male| 0| 0| 40| United-States| <=50K|\n", "| 76| Private|124191| Masters| 14| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 40| United-States| >50K|\n", "| 56| Self-emp-not-inc|335605| HS-grad| 9| Married-civ-spouse| Other-service| Husband| White| Male| 0| 1887| 50| Canada| >50K|\n", "| 53| Private| 95647| 9th| 5| Married-civ-spouse| Handlers-cleaners| Husband| White| Male| 0| 0| 50| United-States| <=50K|\n", "| 56| Self-emp-inc|303090| Some-college| 10| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 50| United-States| <=50K|\n", "+---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+\n", "only showing top 20 rows\n", "\n"]}], "source": ["fifty.show()"]}, {"cell_type": "code", "execution_count": 49, "metadata": {"collapsed": true}, "outputs": [], "source": []}, {"cell_type": "code", "execution_count": 50, "metadata": {"collapsed": true}, "outputs": [], "source": []}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4"}}, "nbformat": 4, "nbformat_minor": 2}