SQL and DataFrame
1 | import pyspark |
A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern:
getOrCreate: Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder
1 | conf=SparkConf().set("spark.python.profile", "true") |
createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
Creates a DataFrame from a list
1 | l=[("alice",1),("bob",2)] |
[Row(name='alice', age=1), Row(name='bob', age=2)]
1 | df.toPandas() |
name | age | |
---|---|---|
0 | alice | 1 |
1 | bob | 2 |
Creates a DataFrame from pandas DataFrame, and use sql query
1 | import pandas as pd |
[Row(name='alice', age=1), Row(name='bob', age=2)]
1 | df2.select("name").collect() |
[Row(name='alice'), Row(name='bob')]
1 | df2.createOrReplaceTempView('table1') |
[Row(name='alice'), Row(name='bob')]
1 | spark.table("table1").collect() |
[Row(name='alice', age=1), Row(name='bob', age=2)]
1 | spark.stop() |
SQLContext
The entry point for working with structured data (rows and columns) in Spark, in Spark 1.x.
As of Spark 2.0, this is replaced by SparkSession. However, we are keeping the class here for backward compatibility.
A SQLContext can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.
1 | sc=SparkContext() |
1 | from pyspark.sql import Row,SQLContext |
[Row(name='alice', age=1), Row(name='bob', age=2)]
1 | sc.stop() |
SparkETL
ETL is a type of data integration process referring to three distinct but interrelated steps (Extract, Transform and Load) and is used to synthesize data from multiple sources many times to build a Data Warehouse, Data Hub, or Data Lake.
Let's write an ETL job on pyspark!
Reference: (https://github.com/AlexIoannides/pyspark-example-project)
Before building ETL process, we write a function start_spark to start our sparkSession, update and get our configuration.
1 | import __main__ |
ETL Process contains 3 stages: Extract, Transform, Load. In Spark,
- Extract: read Parquet format data in local machine
- Transform: use sparkSQL to manipulate data
- Load: write to csv
1 | from spark import start_spark |