In this exercise, we will practice data preprocessing and analysis using Spark. We will work on a financial dataset of companies from the S&P500. In detail, this dataset contains daily information about stock prices.
We show here an example of data analysis as we expect for the final project. In practice, we want to:
- Describe the data: What does the data contain? What are the values of the classical metrics (min, max, average, standard deviation, ...)?
- Ask questions to get insights into the data.
- Visualize the data and results (not done here).
To answer the question, you can use the spark-shell. However, we recommend you save your code and results in a separate file.
Download the
financial dataset. Load this dataset in Spark. You will have to tell Spark that the csv contains a header.
val df = spark.read.option("header",true).csv("sp500_stock_price.csv")
When we start preprocessing data, it is good to have a look at what the data looks like. Print the first lines of the dataset and observe. Next, print the schema of this table. Does it look accurate enough? If not, manually write the schema and reload the data. Reprint the first lines to check that everything is ok.
df.show()
df.printSchema
// We redefine the schema
import org.apache.spark.sql.types.{StructField, StructType, TimestampType, StringType, DoubleType}
val mySchema = StructType(Array(
StructField("Date", TimestampType, true),
StructField("Open", DoubleType, true),
StructField("High", DoubleType, true),
StructField("Low", DoubleType, true),
StructField("Close", DoubleType, true),
StructField("Volume", DoubleType, true),
StructField("Dividends", DoubleType, true),
StructField("Stock Splits", DoubleType, true),
StructField("Symbol", StringType, true),
StructField("Name", StringType, true),
StructField("Sector", StringType, true),
StructField("Adj Close", StringType, false)
))
val df = spark.read.option("header",true).schema(mySchema).csv("sp500_stock_price.csv")
Describe the content of the table. One of the columns seems useless. Remove it.
df.describe().show()
val newDF = df.drop("Adj Close")
How many companies do we have? How many sectors? How many companies per sector?
newDF.map(x => (x.getAs[String]("Sector"))).distinct.count()
//SQL Style
newDF.selectExpr("Name").distinct.count // Equivalent here
//SQL Style
newDF.selectExpr("Sector", "Name").distinct.groupBy("Sector").count().show()
//MapReduce Style With RDD
newDF.map(x => ((x.getAs[String]("Sector"), x.getAs[String]("Name")), 1)).rdd.groupByKey.map(x => (x._1._1, x._1._2)).countByKey
Find the years with the lowest and highest total volume.
You have to get the Data field as a java.sql.Timestamp, then get the year with getYear to which you need to add 1900.
import java.sql.Timestamp
newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("Volume"))).rdd.reduceByKey(_ + _).reduce((x, y) => if (x._2 < y._2) y else x)
newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("Volume"))).rdd.reduceByKey(_ + _).reduce((x, y) => if (x._2 > y._2) y else x)
Open the Spark WebUI (the URL is given when you open the spark-shell). How long was the last job you ran? Click on it and have a look at the DAG. Can you identify the steps in your code?
In the previous question, we asked for the lowest and highest total volume for a year. These two computations have a shared part. What mechanism can you use to make the second computation faster after you run the first one once? Compare the computation times in the WebUI.
We need to use caching. It makes the second computation much faster.
val tmp = newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("Volume"))).rdd.reduceByKey(_ + _).cache()
tmp.reduce((x, y) => if (x._2 > y._2) y else x)
tmp.reduce((x, y) => if (x._2 < y._2) y else x)
We want to find the year in which the stock market varied the most. For each company and year, compute the difference between the lowest and highest prices. Then, calculate the mean per year and find the most varying year. You can verify your results by looking at this
page. What do you think caused this problem?
The year is 2020, the year of the COVID!
val lowDate = newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("Low"))).rdd.groupByKey.map(x => (x._1, x._2.min))
val highDate = newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("High"))).rdd.groupByKey.map(x => (x._1, x._2.max))
lowDate.union(highDate).groupByKey.map(x => (x._1._1, x._2.max - x._2.min)).groupByKey().map(x => (x._1, x._2.sum / x._2.size)).reduce((x, y) => if (x._2 < y._2) y else x)
Now, we would like to print for each year the company that has the highest
VolumeExchanged/SharesOutstanding. You can download the share outstandings
here and run the following code to load the data.
import org.apache.spark.sql.types.{StructField, StructType, StringType, DoubleType}
val mySchema = StructType(Array(
StructField("Symbol", StringType, true),
StructField("shareOutstanding", DoubleType, true)))
val so = spark.read.option("header",true).schema(mySchema).csv("sharesOutstanding.csv")
val soMap = so.collect.map(x => (x(0), x(1))).toMap.asInstanceOf[Map[String,Double]]
val lookup = sc.broadcast(soMap)
newDF.map(x => ((x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[String]("Symbol")), x.getAs[Double]("Volume"))).rdd.reduceByKey(_ + _).map(x => (x._1._1, (x._1._2, x._2 / lookup.value.getOrElse[Double](x._1._2, -1)))).reduceByKey((x, y) => if (x._2 < y._2) y else x).map(x => (x._1, x._2._1)).sortByKey().collect()
On a 3a401-XX machine, you can run:
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export HADOOP_HOME=/netfs/tsp/staff/jromero/hadoop
export SPARK_HOME=/netfs/tsp/staff/jromero/spark
export KAFKA_HOME=/netfs/tsp/staff/jromero/kafka
export PATH=${PATH}:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SPARK_HOME}/bin:${KAFKA_HOME}/bin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export LD_LIBRARY_PATH=${HADOOP_HOME}/lib/native:$LD_LIBRARY_PATH
spark-shell --master yarn --deploy-mode client
val df = spark.read.option("header",true).csv("hdfs:///sp500_stock_price.csv")