{"cells": [{"cell_type": "markdown", "metadata": {}, "source": ["# Spark et MLlib - ML\n", "\n", "R\u00e9gression logisitique avec [Spark](https://spark.apache.org/)."]}, {"cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [{"data": {"text/html": ["
run previous cell, wait for 2 seconds
\n", ""], "text/plain": [""]}, "execution_count": 2, "metadata": {}, "output_type": "execute_result"}], "source": ["from jyquickhelper import add_notebook_menu\n", "add_notebook_menu()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["[MLlib](http://spark.apache.org/docs/latest/ml-guide.html) est la librairie de machine learning distribu\u00e9 impl\u00e9ment\u00e9 sur Spark et qui explique en partie son succ\u00e8s. La premi\u00e8re mouture de la librairie \u00e9tait [Mahout](http://mahout.apache.org/) impl\u00e9ment\u00e9e sur [Hadoop](http://hadoop.apache.org/). [MLlib](http://spark.apache.org/docs/latest/ml-guide.html) est devenu le standard. [ML](http://spark.apache.org/docs/latest/ml-guide.html) est la derni\u00e8re version et s'appuie sur les [DataFrame](http://spark.apache.org/docs/latest/ml-pipeline.html#dataframe). On retrouve les m\u00eames concepts que ceux de [scikit-learn](http://scikit-learn.org/) tels que les [Pipeline](http://spark.apache.org/docs/latest/ml-pipeline.html#main-concepts-in-pipelines)."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Data"]}, {"cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [{"data": {"text/html": ["
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ageworkclassfnlwgteducationeducation_nummarital_statusoccupationrelationshipracesexcapital_gaincapital_losshours_per_weeknative_countrytarget
039State-gov77516Bachelors13Never-marriedAdm-clericalNot-in-familyWhiteMale2174040United-States<=50K
150Self-emp-not-inc83311Bachelors13Married-civ-spouseExec-managerialHusbandWhiteMale0013United-States<=50K
238Private215646HS-grad9DivorcedHandlers-cleanersNot-in-familyWhiteMale0040United-States<=50K
353Private23472111th7Married-civ-spouseHandlers-cleanersHusbandBlackMale0040United-States<=50K
428Private338409Bachelors13Married-civ-spouseProf-specialtyWifeBlackFemale0040Cuba<=50K
\n", "
"], "text/plain": [" age workclass fnlwgt education education_num \\\n", "0 39 State-gov 77516 Bachelors 13 \n", "1 50 Self-emp-not-inc 83311 Bachelors 13 \n", "2 38 Private 215646 HS-grad 9 \n", "3 53 Private 234721 11th 7 \n", "4 28 Private 338409 Bachelors 13 \n", "\n", " marital_status occupation relationship race sex \\\n", "0 Never-married Adm-clerical Not-in-family White Male \n", "1 Married-civ-spouse Exec-managerial Husband White Male \n", "2 Divorced Handlers-cleaners Not-in-family White Male \n", "3 Married-civ-spouse Handlers-cleaners Husband Black Male \n", "4 Married-civ-spouse Prof-specialty Wife Black Female \n", "\n", " capital_gain capital_loss hours_per_week native_country target \n", "0 2174 0 40 United-States <=50K \n", "1 0 0 13 United-States <=50K \n", "2 0 0 40 United-States <=50K \n", "3 0 0 40 United-States <=50K \n", "4 0 0 40 Cuba <=50K "]}, "execution_count": 3, "metadata": {}, "output_type": "execute_result"}], "source": ["import os\n", "if not os.path.exists(\"data_adult.txt\"):\n", " from pyquickhelper.filehelper import unzip_files\n", " unzip_files(\"data_adult.zip\", where_to=\".\")\n", "assert os.path.exists(\"data_adult.txt\")\n", "import pandas\n", "df = pandas.read_csv(\"data_adult.txt\", sep=\"\\t\", encoding=\"utf-8\")\n", "df.head()"]}, {"cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [{"data": {"text/plain": ["age int64\n", "workclass object\n", "fnlwgt int64\n", "education object\n", "education_num int64\n", "marital_status object\n", "occupation object\n", "relationship object\n", "race object\n", "sex object\n", "capital_gain int64\n", "capital_loss int64\n", "hours_per_week int64\n", "native_country object\n", "target object\n", "dtype: object"]}, "execution_count": 4, "metadata": {}, "output_type": "execute_result"}], "source": ["df.dtypes"]}, {"cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [{"data": {"text/plain": ["[(0, dtype('int64')),\n", " (2, dtype('int64')),\n", " (4, dtype('int64')),\n", " (10, dtype('int64')),\n", " (11, dtype('int64')),\n", " (12, dtype('int64'))]"]}, "execution_count": 5, "metadata": {}, "output_type": "execute_result"}], "source": ["cols = list(filter(lambda tu: tu[1] != object, zip(range(len(df.columns)), df.dtypes)))\n", "cols"]}, {"cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [{"data": {"text/plain": ["{0, 2, 4, 10, 11, 12}"]}, "execution_count": 6, "metadata": {}, "output_type": "execute_result"}], "source": ["column_keep = set(_[0] for _ in cols)\n", "column_keep"]}, {"cell_type": "code", "execution_count": 6, "metadata": {"collapsed": true}, "outputs": [], "source": ["df.to_csv(\"adult.txt\", sep=\"\\t\", index=False, header=None)"]}, {"cell_type": "code", "execution_count": 7, "metadata": {"collapsed": true}, "outputs": [], "source": ["data = sc.textFile(\"adult.txt\")"]}, {"cell_type": "code", "execution_count": 8, "metadata": {"collapsed": true}, "outputs": [], "source": ["col = data.take(2)"]}, {"cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [{"data": {"text/plain": ["['39\\t State-gov\\t77516\\t Bachelors\\t13\\t Never-married\\t Adm-clerical\\t Not-in-family\\t White\\t Male\\t2174\\t0\\t40\\t United-States\\t <=50K',\n", " '50\\t Self-emp-not-inc\\t83311\\t Bachelors\\t13\\t Married-civ-spouse\\t Exec-managerial\\t Husband\\t White\\t Male\\t0\\t0\\t13\\t United-States\\t <=50K']"]}, "execution_count": 10, "metadata": {}, "output_type": "execute_result"}], "source": ["col"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## R\u00e9gression logistique (RDD)\n", "\n", "On reprend l'exemple de la documentation :\n", "[Linear Methods - RDD-based API](http://spark.apache.org/docs/latest/mllib-linear-methods.html). On exclue les variables cat\u00e9gorielles pour garder l'exemple concis."]}, {"cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [{"data": {"text/plain": ["[LabeledPoint(1.0, [39.0,77516.0,13.0,2174.0,0.0,40.0]),\n", " LabeledPoint(1.0, [50.0,83311.0,13.0,0.0,0.0,13.0])]"]}, "execution_count": 11, "metadata": {}, "output_type": "execute_result"}], "source": ["def parsePoint(line):\n", " spl = line.split('\\t')\n", " values = [float(x) for i, x in enumerate(spl) if i in column_keep]\n", " target = float(spl[-1].strip() == \"<=50K\")\n", " return LabeledPoint(target, values)\n", "\n", "# We prepare the training data\n", "parsedData = data.map(parsePoint)\n", "parsedData.collect()[:2]"]}, {"cell_type": "code", "execution_count": 11, "metadata": {"collapsed": true, "scrolled": false}, "outputs": [], "source": ["from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel\n", "from pyspark.mllib.regression import LabeledPoint\n", "\n", "# Load and parse the data\n", "def parsePoint(line):\n", " spl = line.split('\\t')\n", " values = [float(x) for i, x in enumerate(spl) if i in column_keep]\n", " target = float(spl[-1].strip() == \"<=50K\")\n", " return LabeledPoint(target, values)\n", "\n", "# We prepare the training data\n", "parsedData = data.map(parsePoint)\n", "\n", "# Build the model\n", "model = LogisticRegressionWithLBFGS.train(parsedData)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Pendant que \u00e7a tourne, il faut regarder la fen\u00eatre terminal qui affiche les messages du serveur de notebook."]}, {"cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [{"data": {"text/plain": ["(2, 6, DenseVector([0.0045, 0.0, 0.0086, -0.0003, -0.0008, 0.009]))"]}, "execution_count": 13, "metadata": {}, "output_type": "execute_result"}], "source": ["model.numClasses, model.numFeatures, model.weights"]}, {"cell_type": "code", "execution_count": 13, "metadata": {"collapsed": true}, "outputs": [], "source": ["from pyquickhelper.filehelper import remove_folder\n", "def clean(folder):\n", " if os.path.exists(folder):\n", " return remove_folder(folder)\n", " else:\n", " return []\n", "clean(\"target/pythonLogisticRegressionWithLBFGSModel\")\n", "\n", "# Save and load model\n", "model.save(sc, \"target/pythonLogisticRegressionWithLBFGSModel\")\n", "sameModel = LogisticRegressionModel.load(sc, \"target/pythonLogisticRegressionWithLBFGSModel\")"]}, {"cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["Training Error = 0.20217438039372254\n"]}], "source": ["# Evaluating the model on training data\n", "labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))\n", "def filter_error(ys):\n", " return ys[0] != ys[1]\n", "trainErr = labelsAndPreds.filter(filter_error).count() / float(parsedData.count())\n", "print(\"Training Error = \" + str(trainErr))"]}, {"cell_type": "markdown", "metadata": {}, "source": ["## R\u00e9gression logisitique (DataFrame)\n", "\n", "On s'inspire de l'exemple :\n", "[R\u00e9gression Logistique](http://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression). Le code change, la pr\u00e9paration des donn\u00e9es aussi. Les mod\u00e8les acceptent comme entr\u00e9es un vecteur colonne cr\u00e9\u00e9 par un [VectorAssembler](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=vector#pyspark.ml.feature.VectorAssembler)."]}, {"cell_type": "code", "execution_count": 15, "metadata": {"collapsed": true}, "outputs": [], "source": ["from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.appName(\"Python Spark SQL basic example\").getOrCreate()"]}, {"cell_type": "code", "execution_count": 16, "metadata": {"collapsed": true, "scrolled": false}, "outputs": [], "source": ["from pyspark.ml.linalg import Vectors\n", "from pyspark.ml.feature import VectorAssembler\n", "training = spark.createDataFrame(df)\n", "training = training.withColumn('Y', training.target == \" <=50K\")\n", "training = training.withColumn(\"Y\", training.Y.astype('float'))\n", "training = training.select([\"age\", \"fnlwgt\", \"education_num\", \"capital_gain\", \"capital_loss\", \n", " \"hours_per_week\", \"Y\"])\n", "assembler = VectorAssembler(\n", " inputCols=[\"age\", \"fnlwgt\", \"education_num\", \"capital_gain\", \"capital_loss\", \"hours_per_week\"],\n", " outputCol=\"features\")\n", "training = assembler.transform(training)"]}, {"cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["== Physical Plan ==\n", "*Project [age#496L, fnlwgt#498L, education_num#500L, capital_gain#506L, capital_loss#507L, hours_per_week#508L, cast((target#510 = <=50K) as float) AS Y#545, UDF(struct(cast(age#496L as double) AS age_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#571, cast(fnlwgt#498L as double) AS fnlwgt_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#572, cast(education_num#500L as double) AS education_num_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#573, cast(capital_gain#506L as double) AS capital_gain_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#574, cast(capital_loss#507L as double) AS capital_loss_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#575, cast(hours_per_week#508L as double) AS hours_per_week_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#576)) AS features#577]\n", "+- Scan ExistingRDD[age#496L,workclass#497,fnlwgt#498L,education#499,education_num#500L,marital_status#501,occupation#502,relationship#503,race#504,sex#505,capital_gain#506L,capital_loss#507L,hours_per_week#508L,native_country#509,target#510]\n"]}], "source": ["training.explain()"]}, {"cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [{"data": {"text/plain": ["[Row(age=39, fnlwgt=77516, education_num=13, capital_gain=2174, capital_loss=0, hours_per_week=40, Y=1.0, features=DenseVector([39.0, 77516.0, 13.0, 2174.0, 0.0, 40.0])),\n", " Row(age=50, fnlwgt=83311, education_num=13, capital_gain=0, capital_loss=0, hours_per_week=13, Y=1.0, features=DenseVector([50.0, 83311.0, 13.0, 0.0, 0.0, 13.0]))]"]}, "execution_count": 19, "metadata": {}, "output_type": "execute_result"}], "source": ["head = training.take(2)\n", "head"]}, {"cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [{"data": {"text/plain": ["StructType(List(StructField(age,LongType,true),StructField(fnlwgt,LongType,true),StructField(education_num,LongType,true),StructField(capital_gain,LongType,true),StructField(capital_loss,LongType,true),StructField(hours_per_week,LongType,true),StructField(Y,FloatType,true),StructField(features,VectorUDT,true)))"]}, "execution_count": 20, "metadata": {}, "output_type": "execute_result"}], "source": ["training.schema"]}, {"cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [{"data": {"text/plain": ["[Row(Y=1.0, count=24720), Row(Y=0.0, count=7841)]"]}, "execution_count": 21, "metadata": {}, "output_type": "execute_result"}], "source": ["training.groupby(\"Y\").count().collect()"]}, {"cell_type": "code", "execution_count": 21, "metadata": {"collapsed": true}, "outputs": [], "source": ["from pyspark.ml.classification import LogisticRegression"]}, {"cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["Coefficients: (6,[],[])\n", "Intercept: 1.1482462553407051\n"]}], "source": ["lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol='Y', featuresCol=\"features\")\n", "\n", "# Fit the model\n", "lrModel = lr.fit(training)\n", "\n", "# Print the coefficients and intercept for logistic regression\n", "print(\"Coefficients: \" + str(lrModel.coefficients))\n", "print(\"Intercept: \" + str(lrModel.intercept))"]}, {"cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [{"data": {"text/plain": ["[Row(age=39, fnlwgt=77516, education_num=13, capital_gain=2174, capital_loss=0, hours_per_week=40, Y=1.0, features=DenseVector([39.0, 77516.0, 13.0, 2174.0, 0.0, 40.0]), rawPrediction=DenseVector([-1.1482, 1.1482]), probability=DenseVector([0.2408, 0.7592]), prediction=1.0),\n", " Row(age=50, fnlwgt=83311, education_num=13, capital_gain=0, capital_loss=0, hours_per_week=13, Y=1.0, features=DenseVector([50.0, 83311.0, 13.0, 0.0, 0.0, 13.0]), rawPrediction=DenseVector([-1.1482, 1.1482]), probability=DenseVector([0.2408, 0.7592]), prediction=1.0)]"]}, "execution_count": 24, "metadata": {}, "output_type": "execute_result"}], "source": ["prediction = lrModel.transform(training)\n", "prediction.take(2)"]}, {"cell_type": "code", "execution_count": 24, "metadata": {"collapsed": true}, "outputs": [], "source": []}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4"}}, "nbformat": 4, "nbformat_minor": 2}