{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "92a57b81-9b11-47e9-905f-d8b0f5210b36",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "# Chapter 3: Function Junction - Data manipulation with PySpark\n",
    "\n",
    "\n",
    "## Clean data\n",
    "In data science, `garbage in, garbage out` (GIGO) is the concept that flawed, biased or poor quality information or input produces a result or output of similar quality.\n",
    "To improve the analysis quality, we need data cleaning, the process to turn garbage into gold, it is composed of identifying, correcting, or removing errors and inconsistencies in data to improve its quality and usability. \n",
    "\n",
    "\n",
    "\n",
    "Let's start with a Dataframe containing bad values:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "tags": [
     "remove-cell"
    ]
   },
   "outputs": [],
   "source": [
    "!pip install pyspark==4.0.0.dev2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "tags": [
     "remove-cell"
    ]
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"Data Loading and Storage Example\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "010edc0f-b8ff-4ca7-890b-312cfd86aee0",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+------+-----+\n",
      "| age|height| NAME|\n",
      "+----+------+-----+\n",
      "|  10|  80.0|Alice|\n",
      "|  10|  80.0|Alice|\n",
      "|   5|   NaN|  BOB|\n",
      "|NULL|  NULL|  Tom|\n",
      "|NULL|   NaN| NULL|\n",
      "|   9|  78.9| josh|\n",
      "|  18|1802.3| bush|\n",
      "|   7|  75.3|jerry|\n",
      "+----+------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import Row\n",
    "\n",
    "df = spark.createDataFrame([\n",
    "    Row(age=10, height=80.0, NAME=\"Alice\"),\n",
    "    Row(age=10, height=80.0, NAME=\"Alice\"),\n",
    "    Row(age=5, height=float(\"nan\"), NAME=\"BOB\"),\n",
    "    Row(age=None, height=None, NAME=\"Tom\"),\n",
    "    Row(age=None, height=float(\"nan\"), NAME=None),\n",
    "    Row(age=9, height=78.9, NAME=\"josh\"),\n",
    "    Row(age=18, height=1802.3, NAME=\"bush\"),\n",
    "    Row(age=7, height=75.3, NAME=\"jerry\"),\n",
    "])\n",
    "\n",
    "df.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "d69ab969-8377-449b-a8a5-3c2e900298eb",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "\n",
    "### Rename columns\n",
    "At first glance, we find that column `NAME` is upper case.\n",
    "For consistency, we can use `DataFrame.withColumnRenamed` to rename columns."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "d682268d-dc62-47a2-be2e-b26af4b6bf0d",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+------+-----+\n",
      "| age|height| name|\n",
      "+----+------+-----+\n",
      "|  10|  80.0|Alice|\n",
      "|  10|  80.0|Alice|\n",
      "|   5|   NaN|  BOB|\n",
      "|NULL|  NULL|  Tom|\n",
      "|NULL|   NaN| NULL|\n",
      "|   9|  78.9| josh|\n",
      "|  18|1802.3| bush|\n",
      "|   7|  75.3|jerry|\n",
      "+----+------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df2 = df.withColumnRenamed(\"NAME\", \"name\")\n",
    "\n",
    "df2.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "34599ede-9280-48bb-b968-1bdda9d22d8e",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Drop null values\n",
    "\n",
    "Then we can notice that there are two kinds of missing data:\n",
    "\n",
    "- the `NULL` values in all three columns;\n",
    "- the `NaN` values which means `Not a Number` for a numeric column;\n",
    "\n",
    "The records without a valid `name` are likely useless, so let's drop them first. There are a group of functions in `DataFrameNaFunctions` for missing value handling, we can use `DataFrame.na.drop` or `DataFrame.dropna` to omit rows with `NULL` or `NaN` values.\n",
    "\n",
    "After the step `df2.na.drop(subset=\"name\")`, invalid record `(age=None, height=NaN, name=None)` is discarded."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "bc46e7b4-c8ec-47cb-8934-9d7fde49e426",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+------+-----+\n",
      "| age|height| name|\n",
      "+----+------+-----+\n",
      "|  10|  80.0|Alice|\n",
      "|  10|  80.0|Alice|\n",
      "|   5|   NaN|  BOB|\n",
      "|NULL|  NULL|  Tom|\n",
      "|   9|  78.9| josh|\n",
      "|  18|1802.3| bush|\n",
      "|   7|  75.3|jerry|\n",
      "+----+------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df3 = df2.na.drop(subset=\"name\")\n",
    "\n",
    "df3.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "5c2b010e-f591-4ccd-a23b-8f68cc54e395",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Fill values\n",
    "\n",
    "For the remaining missing values, we can use `DataFrame.na.fill` or `DataFrame.fillna` to fill them.\n",
    "\n",
    "With a `Dict` input `{'age': 10, 'height': 80.1}`, we can specify the values for columns `age` and `height` together."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "9aac3291-4e70-435c-a665-59beed8ef3b1",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+-----+\n",
      "|age|height| name|\n",
      "+---+------+-----+\n",
      "| 10|  80.0|Alice|\n",
      "| 10|  80.0|Alice|\n",
      "|  5|  80.1|  BOB|\n",
      "| 10|  80.1|  Tom|\n",
      "|  9|  78.9| josh|\n",
      "| 18|1802.3| bush|\n",
      "|  7|  75.3|jerry|\n",
      "+---+------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df4 = df3.na.fill({'age': 10, 'height': 80.1})\n",
    "\n",
    "df4.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "7c7ef34d-3403-4d5f-96a2-56f823e30277",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Remove outliers\n",
    "\n",
    "After above steps, all missing values are dropped or filled.\n",
    "However, we can find that `height=1802.3` seems unreasonable, to remove this kind of outliers, we can filter the DataFrame with a valid range like `(65, 85)`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "09308454-3cea-4dd2-b0eb-01d93abd0488",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+-----+\n",
      "|age|height| name|\n",
      "+---+------+-----+\n",
      "| 10|  80.0|Alice|\n",
      "| 10|  80.0|Alice|\n",
      "|  5|  80.1|  BOB|\n",
      "| 10|  80.1|  Tom|\n",
      "|  9|  78.9| josh|\n",
      "|  7|  75.3|jerry|\n",
      "+---+------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df5 = df4.where(df4.height.between(65, 85))\n",
    "\n",
    "df5.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "509b8ebe-52ba-4b8f-9473-018c2a8b1273",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Remove duplicates\n",
    "\n",
    "Now, all invalid records have been handled. But we notice that record `(age=10, height=80.0, name=Alice)` has been duplicated. To remove such duplicates, we can simply apply `DataFrame.distinct`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "c5cb2e96-f194-46ab-9489-efe2fe14190d",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+-----+\n",
      "|age|height| name|\n",
      "+---+------+-----+\n",
      "| 10|  80.0|Alice|\n",
      "|  5|  80.1|  BOB|\n",
      "| 10|  80.1|  Tom|\n",
      "|  9|  78.9| josh|\n",
      "|  7|  75.3|jerry|\n",
      "+---+------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df6 = df5.distinct()\n",
    "\n",
    "df6.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "287a5e17-bf51-423b-9cb1-1f0eb0663658",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### String manipulation\n",
    "\n",
    "Column `name` contains both lower case and upper case letters. We can apply `lower()` function to convert all letters to lower case. \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "80a9cb72-1d37-407a-9161-85ea78ee4b73",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+-----+\n",
      "|age|height| name|\n",
      "+---+------+-----+\n",
      "| 10|  80.0|alice|\n",
      "|  5|  80.1|  bob|\n",
      "| 10|  80.1|  tom|\n",
      "|  9|  78.9| josh|\n",
      "|  7|  75.3|jerry|\n",
      "+---+------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import functions as sf\n",
    "\n",
    "df7 = df6.withColumn(\"name\", sf.lower(\"name\"))\n",
    "df7.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "3b28e946-91be-4dd3-a4c3-720e2579a272",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "For more complicated string manipulations, we can also use `udf` to utilize Python's power functions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "ff338521-4770-4b49-b064-0ed3cff12570",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+-----+\n",
      "|age|height| name|\n",
      "+---+------+-----+\n",
      "| 10|  80.0|Alice|\n",
      "|  5|  80.1|  Bob|\n",
      "| 10|  80.1|  Tom|\n",
      "|  9|  78.9| Josh|\n",
      "|  7|  75.3|Jerry|\n",
      "+---+------+-----+\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import functions as sf\n",
    "\n",
    "capitalize = sf.udf(lambda s: s.capitalize())\n",
    "\n",
    "df8 = df6.withColumn(\"name\", capitalize(\"name\"))\n",
    "df8.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "5279bf5b-6c37-48e7-bbd9-b3207820bb95",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Reorder columns\n",
    "\n",
    "After above process, the data is clean and we want to reorder the columns before saving the DataFrame to some storage. You can refer to previous chapter `Load and Behold: Data loading, storage, file formats` for more details.\n",
    "\n",
    "Normally, we use `DataFrame.select` for this purpose."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "f316c501-6f97-4772-82a8-568fd59f04ff",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+---+------+\n",
      "| name|age|height|\n",
      "+-----+---+------+\n",
      "|alice| 10|  80.0|\n",
      "|  bob|  5|  80.1|\n",
      "|  tom| 10|  80.1|\n",
      "| josh|  9|  78.9|\n",
      "|jerry|  7|  75.3|\n",
      "+-----+---+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df9 = df7.select(\"name\", \"age\", \"height\")\n",
    "\n",
    "df9.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "55a8d1de-f53a-4a73-a7c0-8dd8376f2dd5",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "## Transform data\n",
    "\n",
    "The main part of a data engineering project is transformation. We create new dataframes from old ones."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "b8dc6227-05c9-4c0a-90da-e3f377c9468b",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Select columns with select()\n",
    "\n",
    "The input table may contains hundreds of columns, but for a specific project we likly are interested only in a small subset of them.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "402e442a-b04e-492b-a1b7-376185ea9f50",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+\n",
      "| id|col_0|col_1|col_2|col_3|col_4|col_5|col_6|col_7|col_8|col_9|col_10|col_11|col_12|col_13|col_14|col_15|col_16|col_17|col_18|col_19|\n",
      "+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+\n",
      "|  0|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  1|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  2|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  3|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  4|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  5|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  6|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  7|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  8|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "|  9|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|\n",
      "+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import functions as sf\n",
    "df = spark.range(10)\n",
    "\n",
    "for i in range(20):\n",
    "  df = df.withColumn(f\"col_{i}\", sf.lit(i))\n",
    "\n",
    "df.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "585efeeb-f935-4bc9-9d72-83d4e2cbe946",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "\n",
    "We create a DataFrame with 21 columns via a `for` loop, then we only select 4 columns by `select`. Columns `id`, `col_2` and `col_3` are directly selected from previous DataFrame, while column `sqrt_col_4_plus_5` is generated by the math functions.\n",
    "\n",
    "We have hundreds of functions for column manipulation in `pyspark.sql.function` and `pyspark.sql.Column`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "dde46ecc-a43f-4c83-823c-4ba010291e2c",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-----+-----+-----------------+\n",
      "| id|col_2|col_3|sqrt_col_4_plus_5|\n",
      "+---+-----+-----+-----------------+\n",
      "|  0|    2|    3|              3.0|\n",
      "|  1|    2|    3|              3.0|\n",
      "|  2|    2|    3|              3.0|\n",
      "|  3|    2|    3|              3.0|\n",
      "|  4|    2|    3|              3.0|\n",
      "|  5|    2|    3|              3.0|\n",
      "|  6|    2|    3|              3.0|\n",
      "|  7|    2|    3|              3.0|\n",
      "|  8|    2|    3|              3.0|\n",
      "|  9|    2|    3|              3.0|\n",
      "+---+-----+-----+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "\n",
    "df2 = df.select(\"id\", \"col_2\", \"col_3\", sf.sqrt(sf.col(\"col_4\") + sf.col(\"col_5\")).alias(\"sqrt_col_4_plus_5\"))\n",
    "\n",
    "df2.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "5dea6b73-186e-4bdc-a4f1-918b4e74ef75",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Filter rows with where()\n",
    "\n",
    "The input table may be super huge and contains billions of rows, and we may also be interested in only a small subset.\n",
    "\n",
    "We can use `where` or `filter` with sepcified conditions to filter the rows.\n",
    "\n",
    "For example, we can select rows with odd `id` values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "2bf40a39-5a42-49af-8a3b-afbb766bbdc9",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-----+-----+-----------------+\n",
      "| id|col_2|col_3|sqrt_col_4_plus_5|\n",
      "+---+-----+-----+-----------------+\n",
      "|  1|    2|    3|              3.0|\n",
      "|  3|    2|    3|              3.0|\n",
      "|  5|    2|    3|              3.0|\n",
      "|  7|    2|    3|              3.0|\n",
      "|  9|    2|    3|              3.0|\n",
      "+---+-----+-----+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df3 = df2.where(sf.col(\"id\") % 2 == 1)\n",
    "\n",
    "df3.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "6e34ecd8-8c7a-44b1-9f7e-d4b1cc40a2b7",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "## Summarizing data\n",
    "\n",
    "In data analysis, we normally end up with summarizing data to a chart or table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "1fbde87c-c5f7-4102-a41d-9c81c63d750b",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+\n",
      "|             incomes| NAME|\n",
      "+--------------------+-----+\n",
      "|[123.0, 456.0, 78...|Alice|\n",
      "|      [234.0, 567.0]|  BOB|\n",
      "|[100.0, 200.0, 10...|  Tom|\n",
      "|       [79.0, 128.0]| josh|\n",
      "|[123.0, 145.0, 17...| bush|\n",
      "|[111.0, 187.0, 45...|jerry|\n",
      "+--------------------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import Row\n",
    "\n",
    "df = spark.createDataFrame([\n",
    "    Row(incomes=[123.0, 456.0, 789.0], NAME=\"Alice\"),\n",
    "    Row(incomes=[234.0, 567.0], NAME=\"BOB\"),\n",
    "    Row(incomes=[100.0, 200.0, 100.0], NAME=\"Tom\"),\n",
    "    Row(incomes=[79.0, 128.0], NAME=\"josh\"),\n",
    "    Row(incomes=[123.0, 145.0, 178.0], NAME=\"bush\"),\n",
    "    Row(incomes=[111.0, 187.0, 451.0, 188.0, 199.0], NAME=\"jerry\"),\n",
    "])\n",
    "\n",
    "df.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "aa593364-0f87-48d1-969e-7620c8c3ff85",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "For example, given the income per month, we want to find the average income for each name."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "107e5b59-0d5c-4539-b481-c7895115bb5d",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-----------------------------------+\n",
      "|name |incomes                            |\n",
      "+-----+-----------------------------------+\n",
      "|alice|[123.0, 456.0, 789.0]              |\n",
      "|bob  |[234.0, 567.0]                     |\n",
      "|tom  |[100.0, 200.0, 100.0]              |\n",
      "|josh |[79.0, 128.0]                      |\n",
      "|bush |[123.0, 145.0, 178.0]              |\n",
      "|jerry|[111.0, 187.0, 451.0, 188.0, 199.0]|\n",
      "+-----+-----------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import functions as sf\n",
    "\n",
    "df2 = df.select(sf.lower(\"NAME\").alias(\"name\"), \"incomes\")\n",
    "\n",
    "df2.show(truncate=False)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "3ee9f0d6-2d77-43b5-8632-2f60004b8bb4",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Reshape data using explode()\n",
    "\n",
    "To make the data easier for aggregation, we can use `explode()` function to reshape the data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "1c0fb5ea-10aa-4f7a-be15-6881a04f3485",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+------+\n",
      "| name|income|\n",
      "+-----+------+\n",
      "|alice| 123.0|\n",
      "|alice| 456.0|\n",
      "|alice| 789.0|\n",
      "|  bob| 234.0|\n",
      "|  bob| 567.0|\n",
      "|  tom| 100.0|\n",
      "|  tom| 200.0|\n",
      "|  tom| 100.0|\n",
      "| josh|  79.0|\n",
      "| josh| 128.0|\n",
      "| bush| 123.0|\n",
      "| bush| 145.0|\n",
      "| bush| 178.0|\n",
      "|jerry| 111.0|\n",
      "|jerry| 187.0|\n",
      "|jerry| 451.0|\n",
      "|jerry| 188.0|\n",
      "|jerry| 199.0|\n",
      "+-----+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df3 = df2.select(\"name\", sf.explode(\"incomes\").alias(\"income\"))\n",
    "\n",
    "df3.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "21590f9b-7e64-406d-9d55-7f0ac47b594c",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Summarizing via groupBy() and agg()\n",
    "\n",
    "Then we normally use `DataFrame.groupBy(...).agg(...)` to aggreate the data. To compute the average income, we can apply aggration function `avg`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "31dece1d-4d2a-4f85-bb4e-76d3f82d8ca0",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+------------------+\n",
      "| name|        avg_income|\n",
      "+-----+------------------+\n",
      "|alice|             456.0|\n",
      "|  bob|             400.5|\n",
      "|  tom|133.33333333333334|\n",
      "| josh|             103.5|\n",
      "| bush|148.66666666666666|\n",
      "|jerry|             227.2|\n",
      "+-----+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df4 = df3.groupBy(\"name\").agg(sf.avg(\"income\").alias(\"avg_income\"))\n",
    "\n",
    "df4.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "0938c629-4494-4614-8eac-3fabb0eb1547",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "### Orderby\n",
    "\n",
    "For final analysis, we normally want to order the data. In this case, we can order the data by `name`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "7e54b022-cb52-4fe2-a79d-259be277d705",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+------------------+\n",
      "| name|        avg_income|\n",
      "+-----+------------------+\n",
      "|alice|             456.0|\n",
      "|  bob|             400.5|\n",
      "| bush|148.66666666666666|\n",
      "|jerry|             227.2|\n",
      "| josh|             103.5|\n",
      "|  tom|133.33333333333334|\n",
      "+-----+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df5 = df4.orderBy(\"name\")\n",
    "\n",
    "df5.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "dc6e8724-1363-4d55-a98c-87ad11efb787",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "## When DataFrames Collide: The Art of Joining\n",
    "\n",
    "When dealing with multiple dataframe, we likely need to combine them together in some way. The most frequently used approach is joining.\n",
    "\n",
    "For example, given the `incomes` data and `height` data, we can use `DataFrame.join` to join them together by `name`.\n",
    "\n",
    "We can see that only `alice`, `josh` and `bush` are in the final results, because they appear in both DataFrames."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "2a5e52e6-fc57-4315-b649-f79828269449",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "\n",
    "df1 = spark.createDataFrame([\n",
    "    Row(age=10, height=80.0, name=\"alice\"),\n",
    "    Row(age=9, height=78.9, name=\"josh\"),\n",
    "    Row(age=18, height=82.3, name=\"bush\"),\n",
    "    Row(age=7, height=75.3, name=\"tom\"),\n",
    "])\n",
    "\n",
    "df2 = spark.createDataFrame([\n",
    "    Row(incomes=[123.0, 456.0, 789.0], name=\"alice\"),\n",
    "    Row(incomes=[234.0, 567.0], name=\"bob\"),\n",
    "    Row(incomes=[79.0, 128.0], name=\"josh\"),\n",
    "    Row(incomes=[123.0, 145.0, 178.0], name=\"bush\"),\n",
    "    Row(incomes=[111.0, 187.0, 451.0, 188.0, 199.0], name=\"jerry\"),\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "bb635cb3-74b4-40f3-a8bd-b74fca020f95",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+---+------+---------------------+\n",
      "|name |age|height|incomes              |\n",
      "+-----+---+------+---------------------+\n",
      "|alice|10 |80.0  |[123.0, 456.0, 789.0]|\n",
      "|bush |18 |82.3  |[123.0, 145.0, 178.0]|\n",
      "|josh |9  |78.9  |[79.0, 128.0]        |\n",
      "+-----+---+------+---------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df3 = df1.join(df2, on=\"name\")\n",
    "\n",
    "df3.show(truncate=False)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "98cefd42-a358-4d12-87f2-41cc41aa98b7",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "There are seven join methods:\n",
    "- `INNER`\n",
    "- `LEFT`\n",
    "- `RIGHT`\n",
    "- `FULL`\n",
    "- `CROSS`\n",
    "- `LEFTSEMI`\n",
    "- `LEFTANTI`\n",
    "\n",
    "And the default one is `INNER`.\n",
    "\n",
    "Let's take `LEFT` join as another example. A left join includes all of the records from the first (left) of two tables, even if there are no matching values for records in the second (right) table.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "cdc282ab-ed1d-4964-9ebd-a5770de93cc3",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+---+------+---------------------+\n",
      "|name |age|height|incomes              |\n",
      "+-----+---+------+---------------------+\n",
      "|alice|10 |80.0  |[123.0, 456.0, 789.0]|\n",
      "|josh |9  |78.9  |[79.0, 128.0]        |\n",
      "|bush |18 |82.3  |[123.0, 145.0, 178.0]|\n",
      "|tom  |7  |75.3  |NULL                 |\n",
      "+-----+---+------+---------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df4 = df1.join(df2, on=\"name\", how=\"left\")\n",
    "\n",
    "df4.show(truncate=False)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {},
     "inputWidgets": {},
     "nuid": "70257613-f432-48e6-bc58-b521fde9b77a",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "source": [
    "And a `RIGHT` join keeps all of the records from the right table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "cellMetadata": {
      "byteLimit": 2048000,
      "rowLimit": 10000
     },
     "inputWidgets": {},
     "nuid": "846429dc-ea7e-484e-ad3d-82e625348f69",
     "showTitle": false,
     "tableResultSettingsMap": {},
     "title": ""
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+----+------+-----------------------------------+\n",
      "|name |age |height|incomes                            |\n",
      "+-----+----+------+-----------------------------------+\n",
      "|alice|10  |80.0  |[123.0, 456.0, 789.0]              |\n",
      "|bob  |NULL|NULL  |[234.0, 567.0]                     |\n",
      "|josh |9   |78.9  |[79.0, 128.0]                      |\n",
      "|bush |18  |82.3  |[123.0, 145.0, 178.0]              |\n",
      "|jerry|NULL|NULL  |[111.0, 187.0, 451.0, 188.0, 199.0]|\n",
      "+-----+----+------+-----------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df5 = df1.join(df2, on=\"name\", how=\"right\")\n",
    "\n",
    "df5.show(truncate=False)"
   ]
  }
 ],
 "metadata": {
  "application/vnd.databricks.v1+notebook": {
   "dashboards": [],
   "environmentMetadata": {
    "base_environment": "",
    "client": "1"
   },
   "language": "python",
   "notebookMetadata": {
    "pythonIndentUnit": 2
   },
   "notebookName": "PythonCookbook",
   "widgets": {}
  },
  "kernelspec": {
   "display_name": "",
   "language": "python",
   "name": ""
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
