CI1 : Apache Kafka
Install Kafka and create producer/consumer applications.
Install Kafka (∼20mn – easy)
Like in the previous labs, we need to download Kafka and uncompress it. You can go to Kafka Download page or run:
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 # Change the location for the location on YOUR computer.
wget https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1
We want to try Kafka. To start it, we need to start Zookeeper and Kafka in two different terminals. Run:
In both terminals, you should see the logs of the applications.
bin/zookeeper-server-start.sh config/zookeeper.properties
# Open a new terminal!
bin/kafka-server-start.sh config/server.properties
In both terminals, you should see the logs of the applications.
Using the commands we saw during the lecture, create a new topic called first-topic (Kafka address should be localhost:9092). How many partitions does it contain?
bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092
We want to start a dummy producer and consumer. To do so, run in two different terminals:
Write something in the producer terminal and press ENTER. You should see what you typed in the consumer terminal.
These two commands are very convenient to insert manually a message in a topic or to read the content of a topic.
# The producer
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"
# The consumer
bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092 --property print.key=true
These two commands are very convenient to insert manually a message in a topic or to read the content of a topic.
Quit the producer and consumer (Ctrl + c). We are done with this first topic. Delete it and check if it was actually done.
bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Kafka in Scala (∼40mn – medium)
For this exercice, we will work on a new Kafka topic. Create a new topic called kafka-scala. Next, open your IDE and create a new project. Then, import the JARs in the kafka/lib directory.
If you are using SBT, you need to add the dependencies in the build.sbt file:
Do not forget to press Load sbt Changes (Ctrl + Shift + O).
Create an Scala object called FirstProducer and copy-paste the following code:
What does this producer produce?
If you are using SBT, you need to add the dependencies in the build.sbt file:
libraryDependencies += "org.apache.kafka" %% "kafka" % "3.6.1"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "3.6.1"
Create an Scala object called FirstProducer and copy-paste the following code:
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Properties
object FirstProducer {
def main(args: Array[String]): Unit = {
// We define the properties of our producer
val props: Properties = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstProducer")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// Creation of the producer
val producer = new KafkaProducer[String, String](props)
// Help close the producer cleanly
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
System.out.println("Exiting...")
producer.close()
}
})
// What the producer does.
LazyList.from(0).foreach(x => {producer.send(
new ProducerRecord[String, String]("kafka-scala", x.toString, x.toString))
Thread.sleep(1000)
})
}
}
It counts from 0 to infinity and increases the counter every second.
We now create a consumer called FirstConsumer as follows:
What is the group of our consumer? The offset is initialized by the parameter auto.offset.reset. By reading the documentation, what does it do here and what are the other options?
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
import java.util
import java.util.Properties
object FirstConsumer {
def main(args: Array[String]): Unit = {
// Creation of the properties of the consumer
val props = new Properties
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "FirstGroup")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("auto.offset.reset", "earliest")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// Creation of the consumer
val consumer = new KafkaConsumer[String, String](props)
// Help close the application cleanly
// Will raise a WakeupException that we will catch later
val mainThread = Thread.currentThread
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
System.out.println("Exiting...")
consumer.wakeup()
try mainThread.join()
catch {
case e: InterruptedException =>
e.printStackTrace()
}
}
})
try {
consumer.subscribe(util.Arrays.asList("kafka-scala"))
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
records.forEach(record => println("Offset : " + record.offset() +
", Key : " + record.key() +
", Value : " + record.value() +
", Partition : " + record.partition()))
}
} catch {
case e: WakeupException => println("Waking up...")
}
finally {
consumer.close()
}
}
}
The group of the consumer is FirstGroup.
The possible offset resets are:
The possible offset resets are:
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer's group
- anything else: throw exception to the consumer.
Run the consumer and the producer. You should see the messages appear in the consumer terminal.
Check that your group was created by listing the groups. Describe your group. Do you see your running consumer?
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group FirstGroup
We want to start a second consumer without stopping the first one. By default, this is not possible directly in your IDE, but you can change the options. You IntelliJ, click on Run, Edit Configurations... In Build and Run, click on Modify Option and check Allow multiple instances.
After you run the second consumer, what do you observe?
After you run the second consumer, what do you observe?
Only one of the two consumer is running. We can see which one by describing the group.
Set the number of partitions to three. Describe the topic to verify that your change was applied.
Start a new producer and restart one of the consumer. What do you observer now? Can you confirm this by describing the group?
Kill one of the consumer. What do you observe?
Start a new producer and restart one of the consumer. What do you observer now? Can you confirm this by describing the group?
Kill one of the consumer. What do you observe?
bin/kafka-topics.sh --topic kafka-scala --bootstrap-server localhost:9092 --alter --partitions 3
After killing a consumer, all the partitions are processed by the only remaining consumer.
Kafka Applications (∼40mn – medium)
Important for the next lab! Following the model from the previous exercice, write a producer that reads a text file and produces a line from this file every second. Test your producer on a new topic.
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.streams.StreamsConfig
import java.util.Properties
import scala.io.Source
object TextProducer {
def main(args: Array[String]): Unit = {
// We define the properties of our producer
val props: Properties = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "TextProducer")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// Creation of the producer
val producer = new KafkaProducer[String, String](props)
// Help close the producer cleanly
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
System.out.println("Exiting...")
producer.close()
}
})
// What the producer does.
val lines = Source.fromFile("YOUR PATH").getLines.toList
lines.zipWithIndex.foreach(x => {
producer.send(
new ProducerRecord[String, String]("text", x._2.toString, x._1))
Thread.sleep(1000)
})
}
}
In this question, we want to create a system that alerts us when stock prices change too quickly. We describe below the characteristics of our system.
The producer reads every minute the value of a set of stocks and publishes the result with on a topic. The value of a stock can be obtain using the following code:
The getValue function returns a value and the associated date.
You can obtain tickers by visiting the website MarketWatch.
The tickers appears in the URL (rno here) or on multiple places on the page. You must pick french companies. Here are some examples: Renault=rno, Capgemini=cap, LVMH=mc, Airbus=air.
Then, you must send a message to the topic such that the key is the ticker and the value is a String composed of the value and date (we can use the method .getTime) separated by a comma.
Warning: This lab is only feasible during the opening session of the stock market, from 9am to 5pm.
For the consumer we want to:
The producer reads every minute the value of a set of stocks and publishes the result with on a topic. The value of a stock can be obtain using the following code:
import java.util.Locale
val valueRegex: Regex = "class=\"value\" field=\"Last\" format.*?>(?<value>.*?)<".r
val dateRegex: Regex = "<span class=\"timestamp__time\">Last Updated: <bg-quote field=\"date\" .*?>(?<date>.*?)</bg-quote>(?<tz>.*)</span>".r
val format = new java.text.SimpleDateFormat("MMM d, yyyy HH:mm aaa zzzz", Locale.ENGLISH)
// This function return a tuple with the current value of the action and the date of validity
def getValue(ticker: String): (Double, Date) = {
val page = scala.io.Source.fromURL(
"https://www.marketwatch.com/investing/stock/" + ticker + "?countryCode=fr").mkString.replace("\n", " ")
val value = valueRegex.findFirstMatchIn(page) match {
case Some(x) => x.group("value").toDouble
case None => -1
}
val date = dateRegex.findFirstMatchIn(page) match {
case Some(x) => format.parse((x.group("date") + x.group("tz")).strip().replace(".", ""))
case None => new Date()
}
(value, date)
}
For the consumer we want to:
- Keep track of the mean for each ticker. Create a mutable Map that will allow us to compute the mean iteratively without storing all the values.
- Raise an alert using println when the value is computed with more that 10 values and the difference between the mean value of a ticker and the received value is greated that 10% of the mean value.
- Update the mean representation at each step.
Producer
Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.streams.StreamsConfig
import java.util.{Date, Properties}
import scala.util.matching.Regex
object StockProducer {
val valueRegex: Regex = "class=\"value\" field=\"Last\" format.*?>(?<value>.*?)<".r
val dateRegex: Regex = ">span class=\"timestamp__time\">Last Updated: <bg-quote field=\"date\" .*?>(?<date>.*?)</bg-quote>(?<tz>.*)</span>".r
val format = new java.text.SimpleDateFormat("MMM d, yyyy HH:mm aaa zzzz")
// This function return a tuple with the current value of the action and the date of validity
def getValue(ticker: String): (Double, Date) = {
val page = scala.io.Source.fromURL(
"https://www.marketwatch.com/investing/stock/" + ticker + "?countryCode=fr").mkString.replace("\n", " ")
val value = valueRegex.findFirstMatchIn(page) match {
case Some(x) => x.group("value").toDouble
case None => -1
}
val date = dateRegex.findFirstMatchIn(page) match {
case Some(x) => format.parse((x.group("date") + x.group("tz")).strip().replace(".", ""))
case None => new Date()
}
(value, date)
}
def main(args: Array[String]): Unit = {
// We define the properties of our producer
val props: Properties = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StockProducer")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// Creation of the producer
val producer = new KafkaProducer[String, String](props)
// Help close the producer cleanly
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
System.out.println("Exiting...")
producer.close()
}
})
while(true){
// What the producer does.
val vd = getValue("rno")
producer.send(new ProducerRecord[String, String]("stocks",
"rno", vd._1.toString + "," + vd._2.getTime.toString))
Thread.sleep(1000 * 60)
}
}
}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
import java.util
import java.util.{Date, Properties}
object StockConsumer {
def main(args: Array[String]): Unit = {
// Creation of the properties of the consumer
val props = new Properties
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "StockGroup")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("auto.offset.reset", "earliest")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// Creation of the consumer
val consumer = new KafkaConsumer[String, String](props)
// Help close the application cleanly
// Will raise a WakeupException that we will catch later
val mainThread = Thread.currentThread
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
System.out.println("Exiting...")
consumer.wakeup()
try mainThread.join()
catch {
case e: InterruptedException =>
e.printStackTrace()
}
}
})
val means = scala.collection.mutable.Map[String, (Double, Int)]()
try {
consumer.subscribe(util.Arrays.asList("stocks"))
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
records.forEach(record => {
val key = record.key()
val prev : (Double, Int) = means.getOrElse(key, (0, 0))
val price = record.value().split(",")(0).toDouble
val mean = if (prev._2 != 0) prev._1 / prev._2 else 0
val date = new Date(record.value().split(",")(1).toLong)
if (prev._2 > 10 && Math.abs(mean - price) > mean * 0.1) {
println("Alert for " + key + " for the value " + price + " at time " + date + ". The mean is " + mean)
}
means(key) = (prev._1 + price, prev._2 + 1)
})
}
} catch {
case e: WakeupException => println("Waking up...")
}
finally {
consumer.close()
}
}
}