Integrating Apache Kafka with Apache Storm - Scala
This section contains the Scala version of the wordcount program discussed previously.Topology Class: Let us try the topology class with Scala:
import org.apache.Storm.Config
import org.apache.Storm.LocalCluster
import org.apache.Storm.StormSubmitter
import org.apache.Storm.kafka._
import org.apache.Storm.spout.SchemeAsMultiScheme
import org.apache.Storm.topology.TopologyBuilder
object KafkaStormWordCountTopology {
def main(args: Array[String]): Unit = {
val zkConnString: String = "localhost:2181"
val topic: String = "words"
val hosts: BrokerHosts = new ZkHosts(zkConnString)
val kafkaSpoutConfig: SpoutConfig =
new SpoutConfig(hosts, topic, "/" + topic, "wordcountID")
kafkaSpoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime()
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme())
val topologyBuilder: TopologyBuilder = new TopologyBuilder()
topologyBuilder.setSpout("kafkaspout...