CSC 5003 – Semantic Web And Big Data Architecture

Portail informatique

CI1 : Apache Kafka

Install Kafka and create producer/consumer applications.

Install Kafka (∼20mn – easy)

Like in the previous labs, we need to download Kafka and uncompress it. You can go to Kafka Download page or run:
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 # Change the location for the location on YOUR computer. wget https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz tar kafka_2.13-3.6.1.tgz cd kafka_2.13-3.6.1

We want to try Kafka. To start it, we need to start Zookeeper and Kafka in two different terminals. Run:
bin/zookeeper-server-start.sh config/zookeeper.properties # Open a new terminal! bin/kafka-server-start.sh config/server.properties

In both terminals, you should see the logs of the applications.

Using the commands we saw during the lecture, create a new topic called first-topic (Kafka address should be localhost:9092). How many partitions does it contain?
bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092 bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092
You should see only one partition.

We want to start a dummy producer and consumer. To do so, run in two different terminals:
# The producer bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:" # The consumer bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092 --property print.key=true
Write something in the producer terminal and press ENTER. You should see what you typed in the consumer terminal.
These two commands are very convenient to insert manually a message in a topic or to read the content of a topic.

Quit the producer and consumer (Ctrl + c). We are done with this first topic. Delete it and check if it was actually done.
bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092 bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Kafka in Scala (∼40mn – medium)

For this exercice, we will work on a new Kafka topic. Create a new topic called kafka-scala. Next, open your IDE and create a new project. Then, import the JARs in the kafka/lib directory.
If you are using SBT, you need to add the dependencies in the build.sbt file:
libraryDependencies += "org.apache.kafka" %% "kafka" % "3.6.1" libraryDependencies += "org.apache.kafka" % "kafka-streams" % "3.6.1"
Do not forget to press Load sbt Changes (Ctrl + Shift + O).
Create an Scala object called FirstProducer and copy-paste the following code:
import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import java.util.Properties object FirstProducer { def main(args: Array[String]): Unit = { // We define the properties of our producer val props: Properties = new Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstProducer") props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // Creation of the producer val producer = new KafkaProducer[String, String](props) // Help close the producer cleanly Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { System.out.println("Exiting...") producer.close() } }) // What the producer does. LazyList.from(0).foreach(x => {producer.send( new ProducerRecord[String, String]("kafka-scala", x.toString, x.toString)) Thread.sleep(1000) }) } }
What does this producer produce?
It counts from 0 to infinity and increases the counter every second.

We now create a consumer called FirstConsumer as follows:
import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.clients.consumer.KafkaConsumer import java.time.Duration import java.util import java.util.Properties object FirstConsumer { def main(args: Array[String]): Unit = { // Creation of the properties of the consumer val props = new Properties props.put("bootstrap.servers", "localhost:9092") props.put("group.id", "FirstGroup") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") props.put("auto.offset.reset", "earliest") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // Creation of the consumer val consumer = new KafkaConsumer[String, String](props) // Help close the application cleanly // Will raise a WakeupException that we will catch later val mainThread = Thread.currentThread Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { System.out.println("Exiting...") consumer.wakeup() try mainThread.join() catch { case e: InterruptedException => e.printStackTrace() } } }) try { consumer.subscribe(util.Arrays.asList("kafka-scala")) while (true) { val records = consumer.poll(Duration.ofMillis(100)) records.forEach(record => println("Offset : " + record.offset() + ", Key : " + record.key() + ", Value : " + record.value() + ", Partition : " + record.partition())) } } catch { case e: WakeupException => println("Waking up...") } finally { consumer.close() } } }
What is the group of our consumer? The offset is initialized by the parameter auto.offset.reset. By reading the documentation, what does it do here and what are the other options?
The group of the consumer is FirstGroup.
The possible offset resets are:
  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

Run the consumer and the producer. You should see the messages appear in the consumer terminal. Check that your group was created by listing the groups. Describe your group. Do you see your running consumer?
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group FirstGroup

We want to start a second consumer without stopping the first one. By default, this is not possible directly in your IDE, but you can change the options. You IntelliJ, click on Run, Edit Configurations... In Build and Run, click on Modify Option and check Allow multiple instances.
After you run the second consumer, what do you observe?
Only one of the two consumer is running. We can see which one by describing the group.

Set the number of partitions to three. Describe the topic to verify that your change was applied.
Start a new producer and restart one of the consumer. What do you observer now? Can you confirm this by describing the group?
Kill one of the consumer. What do you observe?
bin/kafka-topics.sh --topic kafka-scala --bootstrap-server localhost:9092 --alter --partitions 3
One consumer dealts with one topic, and the other one with the other two. The values are randomly affected to one partition. By describing the group, you should see that the two partitions that are processed by the same consumer have the same CONSUMER-ID.
After killing a consumer, all the partitions are processed by the only remaining consumer.

Kafka Applications (∼40mn – medium)

Important for the next lab! Following the model from the previous exercice, write a producer that reads a text file and produces a line from this file every second. Test your producer on a new topic.
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.streams.StreamsConfig import java.util.Properties import scala.io.Source object TextProducer { def main(args: Array[String]): Unit = { // We define the properties of our producer val props: Properties = new Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, "TextProducer") props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // Creation of the producer val producer = new KafkaProducer[String, String](props) // Help close the producer cleanly Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { System.out.println("Exiting...") producer.close() } }) // What the producer does. val lines = Source.fromFile("YOUR PATH").getLines.toList lines.zipWithIndex.foreach(x => { producer.send( new ProducerRecord[String, String]("text", x._2.toString, x._1)) Thread.sleep(1000) }) } }

In this question, we want to create a system that alerts us when stock prices change too quickly. We describe below the characteristics of our system.
The producer reads every minute the value of a set of stocks and publishes the result with on a topic. The value of a stock can be obtain using the following code:
import java.util.Locale val valueRegex: Regex = "class=\"value\" field=\"Last\" format.*?>(?<value>.*?)<".r val dateRegex: Regex = "<span class=\"timestamp__time\">Last Updated: <bg-quote field=\"date\" .*?>(?<date>.*?)</bg-quote>(?<tz>.*)</span>".r val format = new java.text.SimpleDateFormat("MMM d, yyyy HH:mm aaa zzzz", Locale.ENGLISH) // This function return a tuple with the current value of the action and the date of validity def getValue(ticker: String): (Double, Date) = { val page = scala.io.Source.fromURL( "https://www.marketwatch.com/investing/stock/" + ticker + "?countryCode=fr").mkString.replace("\n", " ") val value = valueRegex.findFirstMatchIn(page) match { case Some(x) => x.group("value").toDouble case None => -1 } val date = dateRegex.findFirstMatchIn(page) match { case Some(x) => format.parse((x.group("date") + x.group("tz")).strip().replace(".", "")) case None => new Date() } (value, date) }
The getValue function returns a value and the associated date. You can obtain tickers by visiting the website MarketWatch. The tickers appears in the URL (rno here) or on multiple places on the page. You must pick french companies. Here are some examples: Renault=rno, Capgemini=cap, LVMH=mc, Airbus=air. Then, you must send a message to the topic such that the key is the ticker and the value is a String composed of the value and date (we can use the method .getTime) separated by a comma. Warning: This lab is only feasible during the opening session of the stock market, from 9am to 5pm.
For the consumer we want to:
  • Keep track of the mean for each ticker. Create a mutable Map that will allow us to compute the mean iteratively without storing all the values.
  • Raise an alert using println when the value is computed with more that 10 values and the difference between the mean value of a ticker and the received value is greated that 10% of the mean value.
  • Update the mean representation at each step.
An alert can be quite rare with this setting. You can make it more frequent by reducing the percentage of deviation or by introducing manually artificial data inside the topic.
Producer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.streams.StreamsConfig import java.util.{Date, Properties} import scala.util.matching.Regex object StockProducer { val valueRegex: Regex = "class=\"value\" field=\"Last\" format.*?>(?<value>.*?)<".r val dateRegex: Regex = ">span class=\"timestamp__time\">Last Updated: <bg-quote field=\"date\" .*?>(?<date>.*?)</bg-quote>(?<tz>.*)</span>".r val format = new java.text.SimpleDateFormat("MMM d, yyyy HH:mm aaa zzzz") // This function return a tuple with the current value of the action and the date of validity def getValue(ticker: String): (Double, Date) = { val page = scala.io.Source.fromURL( "https://www.marketwatch.com/investing/stock/" + ticker + "?countryCode=fr").mkString.replace("\n", " ") val value = valueRegex.findFirstMatchIn(page) match { case Some(x) => x.group("value").toDouble case None => -1 } val date = dateRegex.findFirstMatchIn(page) match { case Some(x) => format.parse((x.group("date") + x.group("tz")).strip().replace(".", "")) case None => new Date() } (value, date) } def main(args: Array[String]): Unit = { // We define the properties of our producer val props: Properties = new Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StockProducer") props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // Creation of the producer val producer = new KafkaProducer[String, String](props) // Help close the producer cleanly Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { System.out.println("Exiting...") producer.close() } }) while(true){ // What the producer does. val vd = getValue("rno") producer.send(new ProducerRecord[String, String]("stocks", "rno", vd._1.toString + "," + vd._2.getTime.toString)) Thread.sleep(1000 * 60) } } }
Consumer
import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.clients.consumer.KafkaConsumer import java.time.Duration import java.util import java.util.{Date, Properties} object StockConsumer { def main(args: Array[String]): Unit = { // Creation of the properties of the consumer val props = new Properties props.put("bootstrap.servers", "localhost:9092") props.put("group.id", "StockGroup") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") props.put("auto.offset.reset", "earliest") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // Creation of the consumer val consumer = new KafkaConsumer[String, String](props) // Help close the application cleanly // Will raise a WakeupException that we will catch later val mainThread = Thread.currentThread Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { System.out.println("Exiting...") consumer.wakeup() try mainThread.join() catch { case e: InterruptedException => e.printStackTrace() } } }) val means = scala.collection.mutable.Map[String, (Double, Int)]() try { consumer.subscribe(util.Arrays.asList("stocks")) while (true) { val records = consumer.poll(Duration.ofMillis(100)) records.forEach(record => { val key = record.key() val prev : (Double, Int) = means.getOrElse(key, (0, 0)) val price = record.value().split(",")(0).toDouble val mean = if (prev._2 != 0) prev._1 / prev._2 else 0 val date = new Date(record.value().split(",")(1).toLong) if (prev._2 > 10 && Math.abs(mean - price) > mean * 0.1) { println("Alert for " + key + " for the value " + price + " at time " + date + ". The mean is " + mean) } means(key) = (prev._1 + price, prev._2 + 1) }) } } catch { case e: WakeupException => println("Waking up...") } finally { consumer.close() } } }