Tag Archives: Scala

Kafka Stream Topology Testing

In this blog post, I will explain how to test Kafka stream topologies.

Kafka Stream topologies can be quite complex and it is important for developers to test their code. There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.

Add below dependency in build.sbt

libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "1.1.0" % Test

Below code example is well-known word count application.

import java.lang.Long

import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.{StreamsBuilder, Topology}

import scala.collection.JavaConverters._
class WordCountApplication {
  
  def countNumberOfWords(inputTopic: String,
                         outputTopic: String, storeName: String): Topology = {
    val builder: StreamsBuilder = new StreamsBuilder()
    val textLines: KStream[String, String] = builder.stream(inputTopic)
    val wordCounts: KTable[String, Long] = textLines
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count(Materialized.as(storeName).asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
    wordCounts.toStream().to(outputTopic, Produced.`with`(Serdes.String(), Serdes.Long()))
    builder.build()
  }
}

Unit test for above topology.


import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.StreamsConfig
trait TestSpec {
protected val stringDeserializer = new StringDeserializer()
protected val longDeserializer = new LongDeserializer()
val config = new Properties()
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-application")
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass.getName)
}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.scalatest.{FlatSpec, Matchers}

class WordCountSpec extends FlatSpec with Matchers with TestSpec {

  it should "count number of words" in {
    val wordCountApplication = new WordCountApplication()
    val driver = new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)
    val recordFactory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new StringSerializer())
    val words = "Hello Kafka Streams, All streams lead to Kafka"
    driver.pipeInput(recordFactory.create(words))
    val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")
    store.get("hello") shouldBe 1
    store.get("kafka") shouldBe 2
    store.get("streams") shouldBe 2
    store.get("lead") shouldBe 1
    store.get("to") shouldBe 1
    driver.close()
  }
}

Let me explain classes used in testing the topology.

TopologyTestDriver: 
This class makes it easier to write tests to verify the behaviour of topologies created with Topology or StreamsBuilder. You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors, sinks, or sub-topologies.

The best thing about TopologyTestDriver is, it works without a real Kafka broker, so the tests execute very quickly with very little overhead.

Using the TopologyTestDriver in tests is easy:  simply instantiate the driver and provide a Topology and StreamsBuilder#build() and Properties configs, use the driver to supply an input message to the topology, and then use the driver to read and verify any messages output by the topology.

ConsumerRecordFactory:
Although the driver doesn’t use a real Kafka broker, it does simulate Kafka Consumer and Producer that read and write raw (byte[]) messages.
You can either deal with messages that have keys(byte[]) and values.

Driver Set-up:
In order to create a TopologyTestDriver instance, you need a Topology and a Properties.
The configuration needs to be representative of what you’d supply to the real topology, so that means including several key properties (StreamsConfig).
For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:


val props = new Properties()
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091")
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName())
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
Topology topology = ...
TopologyTestDriver driver = new TopologyTestDriver(topology, props)

Processing messages:
Here’s an example of an input message on the topic named input-topic.

 val factory = new ConsumerRecordFactory(strSerializer, strSerializer)
 driver.pipeInput(factory.create("input-topic","key1", "value1"))

When TopologyTestDriver#pipeInput()(Send an input message with the given key, value, and timestamp on the specified topic to the topology and then commit the messages) is called, the driver passes the input message through to the appropriate source that consumes the named topic, and will invoke the processor(s) downstream of the source.

If your topology’s processors forward messages to sinks, your test can then consume these output messages to verify they match the expected outcome.
For example, if our topology should have generated 2 messages on output-topic-1 and 1 message on output-topic-2, then our test can obtain these messages using the TopologyTestDriver#readOutput(String, Deserializer, Deserializer)} method(Read the next record from the given topic):

 val record1: ProducerRecord[String, String] = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
 val record2: ProducerRecord[String, String]= driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
val record3: ProducerRecord[String, String] = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

Processor state:
Some processors use Kafka state storage(StateStore), so this driver class provides the generic
getStateStore(store-name) as well as store-type specific methods so that your tests can check the underlying state store(s) used by your topology’s processors.
In our previous example, after we supplied a single input message and checked the three output messages, our test could also check the key-value store to verify the processor correctly added, removed, or updated internal state.
Our test might have pre-populated some state before submitting the input message and verified afterwards that the processor(s) correctly updated the state.

Here is Kafka-Streaming-Test github code that I used in the blog post. In next blog post, I will write how to test complex topologies like joins/KTables.

References:

Kafka Streaming testing

 

When should you use Future/Future.successful/Future.failed.

In this blog post, I would recommend when should you use Future()/Future.succcessful/Future.failed?

    1. Use Future.apply() or simply Future() (i.e., Future block): In the situations, where something to be done asynchronously that can complete sometime in future and may deal with some time consuming operations such as network calls, database operations communicate with one or many other services, processing huge data by consuming multiple cores and etc.
    2. Use Future.successful: When a literal or already computed value to be passed back as a successful future response.
    3. Use Future.failed: When a known and literal exception to be thrown back without performing any further actions in the future.
    4. Future.fromTry: When you already computed Try a value

Future.successful, Future.failed, and Future.fromTry when you need to create an instance of Future and you already have the value.

Reference:

https://viktorklang.com/blog/Futures-in-Scala-protips-3.html

https://functional.works-hub.com/learn/scala-future-blocks-and-futhers-methods-what-to-use-when-4acbd

 

Thanks.

An Easy and fast way of installing Java, Scala and Spark

1. Download and Install Java 8


abdhesh@abdhesh-latitude:~/Documents/Applications$ wget http://download.oracle.com/otn-pub/java/jdk/8u151-b12/e758a0de34e24606bca991d704f6dcbf/jdk-8u151-linux-x64.tar.gz

Extract tar file:


abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf jdk-8u151-linux-x64.tar.gz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
jdk-8u151-linux-x64  jdk-8u151-linux-x64.tar.gz

Set environment path variable for Java


abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc

Above command will open a file and you need to add below lines at end of the file.


export JAVA=/home/abdhesh/Documents/Applications/jdk-8u151-linux-x64
export PATH=$JAVA/bin:$PATH

Save and exit. Now reload a .bashrc file on same terminal’s session


abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run java version command:


abdhesh@abdhesh-latitude:~/Documents/Applications$ java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)

 2. Download and Install Scala


abdhesh@abdhesh-latitude:~/Documents/Applications$ wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.tgz

Extract tar file:


abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf scala-2.12.4.tgz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
scala-2.12.4  scala-2.12.4.tgz

Set environment path variable for scala


abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc
export SCALA=/home/abdhesh/Documents/Applications/scala-2.12.4
export PATH=$JAVA/bin:$SCALA/bin:$PATH

Save and exit. Now reload a .bashrc file on same terminal’s session


abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run scala version command:


abdhesh@abdhesh-latitude:~/Documents/Applications$ scala -version
Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.

3. Download and Install Apache Spark


abdhesh@abdhesh-latitude:~/Documents/Applications$ wget http://apache.mirror.anlx.net/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz

Extract tar file:


abdhesh@abdhesh-latitude:~/Documents/Applications$ tar -xf spark-2.2.1-bin-hadoop2.7.tgz 
abdhesh@abdhesh-latitude:~/Documents/Applications$ ls
spark-2.2.1-bin-hadoop2.7  spark-2.2.1-bin-hadoop2.7.tgz

Set environment path variable for spark


abdhesh@abdhesh-latitude:~/Documents/Applications$ sudo vim ~/.bashrc

Above command will open a file and you need to add below lines at end of the file.


export SPARK=/home/abdhesh/Documents/Applications/spark-2.2.1-bin-hadoop2.7
export PATH=$JAVA/bin:$SCALA/bin:$SPARK/bin:$PATH

Now reload a .bashrc file on same terminal’s session


abdhesh@abdhesh-latitude:~/Documents/Applications$ source ~/.bashrc 

Run Spark shell:

Here is link of people.json file


abdhesh@abdhesh-latitude:~/Documents/Applications$ spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/28 01:02:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/28 01:02:17 WARN Utils: Your hostname, abdhesh-latitude resolves to a loopback address: 127.0.1.1; using 192.168.0.16 instead (on interface wlp2s0)
17/12/28 01:02:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context Web UI available at http://192.168.0.16:4040
Spark context available as 'sc' (master = local[*], app id = local-1514422939241).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 
scala> val df  = spark.read.json("spark-2.2.1-bin-hadoop2.7/data/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.filter("age >= 19").select("name","age").show()
+------+---+
|  name|age|
+------+---+
|  Andy| 30|
|Justin| 19|
+------+---+


scala> //using Sql

scala> df.createOrReplaceTempView("people")

scala> spark.sql("SELECT * FROM people WHERE age >=19").show()
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+


scala> 

Stay tuned for next blog post 🙂

Part-1: Multiple ways of handling an exception in scala

There are multiple ways of handling an exception in Scala. In this blog, I will explain one by one.

1- Using try/catch/finally


 val tryCatch = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ex: Exception =>
      //Code here for handle an exception
  }

  val tryMultipleCatch = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ae: ArithmeticException =>
    //Code here for handle an exception
    case ex: Exception =>
    //Code here for handle an exception
  }

  val tryMultipleCatchFinally = try {
    //Code here that might raise an exception
    throw new Exception
  } catch {
    case ae: ArithmeticException =>
    //Code here for handle an ArithmeticException
    case ex: Exception =>
    //Code here for handle an Exception
  } finally {
    println(":::::")
    //Code here, will always be execute whether an exception is thrown or not
  }

  val tryCatchWithValue: Int = try {
    //Code here that might raise an exception
    "NonNumericValue".toInt
  } catch {
    case ne: NumberFormatException => 0
  } 

2. Using scala.util.Try
The Try type represents a computation that may either result in an exception, or return a successfully computed value.
Instances of Try[T], are either an instance of scala.util.Success[T] or scala.util.Failure[T]

The Try has an ability to pipeline, or chain, operations, catching exceptions along the way like flatMap and map combinators.


import scala.util.{Failure, Success, Try}

  val withTry = Try("1".toInt) // Success(1)
  withTry match {
    case Success(value) => println(value)
    case Failure(ex) =>
      //Code here for handle an exception
      println(ex)
  }

  val tryWithRecover = Try("Non-Numeric-Value".toInt) match {
    case Success(value) => println(value)
    case Failure(ex) => println(ex)
  }



  //Try's map,flatMap,fold etc
  def inc(n: Int): Int = n + 1

  val try1 = Try("abc".toInt)
  val tResult = try1.map(f => inc(f))// The function `inc` will execute when `Try("abc".toInt)` doesn't raise an exception

Try’s recover and recoverWith: Applies the given function f if this is a Failure, otherwise returns this if this is a Success.


//Recover with value
val tryWithRecoverF = Try("Non-Numeric-Value".toInt).recover {
    //Here you pattern match on type of an exception
    case ne: NumberFormatException => 0
    case ex: Exception => 0
  }

//Recover with an another Try
  def recoverWith(first: String, second: String): Try[Int] = {
    //The code of recoverWith function will execute when `Try(first.toInt)` raise an exception
    Try(first.toInt).recoverWith {
      case ne: NumberFormatException => Try(second.toInt)
    }
  }

Note: all Try combinators like map,flatMap, filter, fold, recover, recoverWith, transform, collect will catch exceptions


def compute(number: Int, divideBY: Int): Int = number / divideBY

  val t1 = Try("123".toInt).map(n => compute(n, 2)) //Success(61)
  val t2 = Try("123".toInt).map(n => compute(n, 0)) //Failure(java.lang.ArithmeticException: / by zero)
def computeWithTry(value: String): Try[Int] = Try(value.toInt)

  val r1: Try[Int] = computeWithTry("123")
  r1.fold(
    ex => println(ex),
    value => println(compute(value, 2))
  )

  computeWithTry("123").fold(
    ex => println(s"Exception--${ex}"),
    value => println(compute(value, 0))
  ) // Exception--java.lang.ArithmeticException: / by zero

  computeWithTry("abc").fold(
    ex => println(ex),
    value => println(compute(value, 2))
  )

  computeWithTry("123").map(n => compute(n, 2)) //Success(61)
  computeWithTry("123").map(n => compute(n, 0)) //Failure(java.lang.ArithmeticException: / by zero)
  computeWithTry("abc").map(n => compute(n, 2)) //Failure(java.lang.NumberFormatException: For input string: "abc")

Note: only non-fatal exceptions are caught by the combinators on Try (see scala.util.control.NonFatal). Serious system errors, on the other hand, will be thrown.

Here you can find complete code

Stay tuned for next part 🙂

References: scala.util.Try

Methods with variable arguments

  • Scala methods can have variable arguments (vararg).
  • A method can be specified to have a variable number of arguments by adding a * after the type of the parameter.
  • As an example, let’s define a method that takes a variable number of arguments of type String and that returns their concatenation as String:
  • For obvious reasons, a method can only have one parameter that has variable arguments and it should be the last parameter.
scala> def concatStrings(s: String*): String = s.mkString
concatStrings: (s: String*)String

scala> concatStrings("a", "b", "c")
res0: String = abc

scala> def concatStringsSep(separator: String, s: String*): String =
         s.mkString(separator)

scala> concatStringsSep("/", "a", "b", "c")
res1: String = a/b/c

you can pass sequence as variable length arguments to a function.

scala> val listOfStrings = List("first","second","third")
listOfStrings: List[String] = List(first, second, third)

scala> concatStrings(listOfStrings:_*)
res5: String = firstsecondthird

scala> concatStringsSep(",",listOfStrings:_*)
res6: String = first,second,third

Scala pure/total functions

 

In this blog, I am going to explain about pure/total function.

Let’s discuss what is a function?

function is a process which takes some input, called arguments, and produces some output called a return value

A pure function is a function which:
1. Given the same input, will always return the same output, called Determinism.
2. Produces no side effects.

It described how inputs relate to outputs, without spelling out the steps to get from A to B. Every function call must produce results in isolation. Pure functions are required to construct pure expressions.

The function result value cannot depend on any hidden information or state that may change while program execution proceeds or between different executions of the program, nor can it depend on any external input from I/O devices.

They are easy to parallelized. The pure functions are referentially transparent, we only need to compute their output once for given inputs. Caching and reusing the result of a computation is called memoization, and can only be done safely with pure functions.

Pure functions are also extremely independent — easy to move around, refactor, and reorganize in your code, making your programs more flexible and adaptable to future changes.

In general,

Pure function = Output depends on input + No side effects

f:X->Y, The f is a function which takes X as input and returns Y as output.
There is an expression like (f(3),f(3)), here we are calling function f two times for same input.
so f is a pure function, you can modify an expression.I have lifted out common expression f(3) into a variable because I know every time f(3) will give me the same result.
cont x = f(3)
result = (x,x)

For example, Pure functions

  • sin(x), returning the sine of a number x
  • length(s), returning the size of a string s

Pure functions in Scala,

scala>def square(a: Int) = a * a
square: (a: Int)Int

scala> def pureFunc(x : Int, y : Int) = x + y
pureFunc: (x: Int, y: Int)Int

scala> pureFunc(1,2)
res0: Int = 3
scala> (pureFunc(1,2), pureFunc(1,2))
res1:(Int,Int) =(3,3)
scala> val fr = pureFunc(1,2)
scala>(fr,fr) //fr would not evaluate again and 
res2:(Int,Int) =(3,3)

scala> def impureFunc(x: Int,y: Int) = println(x+y)
impureFunc: (x: Int, y: Int)Unit

scala> impureFunc(1,2)
3

scala> def anotherImpureFunc(x: Int) = if(x > 0) x
anotherImpureFunc: (x: Int)AnyVal

scala> anotherImpureFunc(2)
res2: AnyVal = 2

scala> anotherImpureFunc(-1)
res3: AnyVal = ()