CSC 5003 – Semantic Web And Big Data Architecture

Portail informatique

CI1 : Introduction To Spark

Install Spark and perform data analysis.

Install Spark (∼10mn, – easy)

To begin with, we need to install the tools to run Spark locally. This is similar to what we did with Hadoop. Go to the Spark download page, download the latest version, and uncompress it. Alternatively, you can run the following:

wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz tar -zxvf spark-3.3.1-bin-hadoop3.tgz cd spark-3.3.1-bin-hadoop3
Try to see if Spark works by going into the uncompressed directory and running bin/spark-shell. This command opens the Spark interactive mode.

Data Analysis (∼10mn, – medium)

In this exercise, we will practice data preprocessing and analysis using Spark. We will work on a financial dataset of companies from the S&P500. In detail, this dataset contains daily information about stock prices.

We show here an example of data analysis as we expect for the final project. In practice, we want to:

  • Describe the data: What does the data contain? What are the values of the classical metrics (min, max, average, standard deviation, ...)?
  • Ask questions to get insights into the data.
  • Visualize the data and results (not done here).

To answer the question, you can use the spark-shell. However, we recommend you save your code and results in a separate file.

Download the financial dataset. Load this dataset in Spark. You will have to tell Spark that the csv contains a header.
val df = spark.read.option("header",true).csv("sp500_stock_price.csv")

When we start preprocessing data, it is good to have a look at what the data looks like. Print the first lines of the dataset and observe. Next, print the schema of this table. Does it look accurate enough? If not, manually write the schema and reload the data. Reprint the first lines to check that everything is ok.
df.show() df.printSchema // We redefine the schema import org.apache.spark.sql.types.{StructField, StructType, TimestampType, StringType, DoubleType} val mySchema = StructType(Array( StructField("Date", TimestampType, true), StructField("Open", DoubleType, true), StructField("High", DoubleType, true), StructField("Low", DoubleType, true), StructField("Close", DoubleType, true), StructField("Volume", DoubleType, true), StructField("Dividends", DoubleType, true), StructField("Stock Splits", DoubleType, true), StructField("Symbol", StringType, true), StructField("Name", StringType, true), StructField("Sector", StringType, true), StructField("Adj Close", StringType, false) )) val df = spark.read.option("header",true).schema(mySchema).csv("sp500_stock_price.csv")

Describe the content of the table. One of the columns seems useless. Remove it.
df.describe().show() val newDF = df.drop("Adj Close")

How many companies do we have? How many sectors? How many companies per sector?
newDF.map(x => (x.getAs[String]("Sector"))).distinct.count() //SQL Style newDF.selectExpr("Name").distinct.count // Equivalent here //SQL Style newDF.selectExpr("Sector", "Name").distinct.groupBy("Sector").count().show() //MapReduce Style With RDD newDF.map(x => ((x.getAs[String]("Sector"), x.getAs[String]("Name")), 1)).rdd.groupByKey.map(x => (x._1._1, x._1._2)).countByKey

Find the years with the lowest and highest total volume.
You have to get the Data field as a java.sql.Timestamp, then get the year with getYear to which you need to add 1900.
import java.sql.Timestamp newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("Volume"))).rdd.reduceByKey(_ + _).reduce((x, y) => if (x._2 < y._2) y else x) newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("Volume"))).rdd.reduceByKey(_ + _).reduce((x, y) => if (x._2 > y._2) y else x)

Open the Spark WebUI (the URL is given when you open the spark-shell). How long was the last job you ran? Click on it and have a look at the DAG. Can you identify the steps in your code?
In the previous question, we asked for the lowest and highest total volume for a year. These two computations have a shared part. What mechanism can you use to make the second computation faster after you run the first one once? Compare the computation times in the WebUI.
We need to use caching. It makes the second computation much faster.
val tmp = newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("Volume"))).rdd.reduceByKey(_ + _).cache() tmp.reduce((x, y) => if (x._2 > y._2) y else x) tmp.reduce((x, y) => if (x._2 < y._2) y else x)

We want to find the year in which the stock market varied the most. For each company and year, compute the difference between the lowest and highest prices. Then, calculate the mean per year and find the most varying year. You can verify your results by looking at this page. What do you think caused this problem?
The year is 2020, the year of the COVID!
val lowDate = newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("Low"))).rdd.groupByKey.map(x => (x._1, x._2.min)) val highDate = newDF.map(x => (x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[Double]("High"))).rdd.groupByKey.map(x => (x._1, x._2.max)) lowDate.union(highDate).groupByKey.map(x => (x._1._1, x._2.max - x._2.min)).groupByKey().map(x => (x._1, x._2.sum / x._2.size)).reduce((x, y) => if (x._2 < y._2) y else x)

Now, we would like to print for each year the company that has the highest VolumeExchanged/SharesOutstanding. You can download the share outstandings here and run the following code to load the data.
import org.apache.spark.sql.types.{StructField, StructType, StringType, DoubleType} val mySchema = StructType(Array( StructField("Symbol", StringType, true), StructField("shareOutstanding", DoubleType, true))) val so = spark.read.option("header",true).schema(mySchema).csv("sharesOutstanding.csv") val soMap = so.collect.map(x => (x(0), x(1))).toMap.asInstanceOf[Map[String,Double]]
val lookup = sc.broadcast(soMap) newDF.map(x => ((x.getAs[Timestamp]("Date").getYear + 1900, x.getAs[String]("Symbol")), x.getAs[Double]("Volume"))).rdd.reduceByKey(_ + _).map(x => (x._1._1, (x._1._2, x._2 / lookup.value.getOrElse[Double](x._1._2, -1)))).reduceByKey((x, y) => if (x._2 < y._2) y else x).map(x => (x._1, x._2._1)).sortByKey().collect()
On a 3a401-XX machine, you can run:
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 export HADOOP_HOME=/netfs/tsp/staff/jromero/hadoop export SPARK_HOME=/netfs/tsp/staff/jromero/spark export KAFKA_HOME=/netfs/tsp/staff/jromero/kafka export PATH=${PATH}:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SPARK_HOME}/bin:${KAFKA_HOME}/bin export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export LD_LIBRARY_PATH=${HADOOP_HOME}/lib/native:$LD_LIBRARY_PATH spark-shell --master yarn --deploy-mode client
val df = spark.read.option("header",true).csv("hdfs:///sp500_stock_price.csv")