Spark TCP streaming example without Kafka
Spark streaming is useful to read data from producer and distribute data over multiple machine in clustor or yarn mode.
Few term related to spark streaming –
RDD stands for resilient data distribution. RDD is created from the data that bring when spark streaming executes in batch interval.
Creating simple example of TCP socket streaming is given below –
1 | <span style="color: #555555; font-weight: bold;">@SuppressWarnings</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"resource"</span><span style="color: #333333;">)</span><br /> <span style="color: #008800; font-weight: bold;">public</span> <span style="color: #008800; font-weight: bold;">static</span> <span style="color: #333399; font-weight: bold;">void</span> <span style="color: #0066bb; font-weight: bold;">main</span><span style="color: #333333;">(</span>String<span style="color: #333333;">[]</span> args<span style="color: #333333;">)</span> <span style="color: #333333;">{</span><br /> <br /> SparkConf sparkConf <span style="color: #333333;">=</span> <span style="color: #008800; font-weight: bold;">new</span> SparkConf<span style="color: #333333;">().</span><span style="color: #0000cc;">setMaster</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"spark-master-url"</span><span style="color: #333333;">).</span><span style="color: #0000cc;">setAppName</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"xyz"</span><span style="color: #333333;">)</span><br /> <span style="color: #333333;">.</span><span style="color: #0000cc;">set</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"spark.executor.memory"</span><span style="color: #333333;">,</span> <span style="background-color: #fff0f0;">"1g"</span><span style="color: #333333;">).</span><span style="color: #0000cc;">set</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"spark.cores.max"</span><span style="color: #333333;">,</span> <span style="background-color: #fff0f0;">"5"</span><span style="color: #333333;">).</span><span style="color: #0000cc;">set</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"spark.driver.cores"</span><span style="color: #333333;">,</span> <span style="background-color: #fff0f0;">"2"</span><span style="color: #333333;">)</span><br /> <span style="color: #333333;">.</span><span style="color: #0000cc;">set</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"spark.driver.memory"</span><span style="color: #333333;">,</span> <span style="background-color: #fff0f0;">"2g"</span><span style="color: #333333;">);</span><br /> <br /> JavaStreamingContext ssc <span style="color: #333333;">=</span> <span style="color: #008800; font-weight: bold;">new</span> JavaStreamingContext<span style="color: #333333;">(</span>sparkConf<span style="color: #333333;">,</span> <span style="color: #008800; font-weight: bold;">new</span> Duration<span style="color: #333333;">(</span><span style="color: #0000dd; font-weight: bold;">3000</span><span style="color: #333333;">));</span><br /><br /> JavaDStream<span style="color: #333333;"><</span>String<span style="color: #333333;">></span> JsonReq1 <span style="color: #333333;">=</span> ssc<span style="color: #333333;">.</span><span style="color: #0000cc;">socketTextStream</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"bindIP"</span><span style="color: #333333;">,</span> bindport<span style="color: #333333;">,</span> StorageLevels<span style="color: #333333;">.</span><span style="color: #0000cc;">MEMORY_AND_DISK_SER</span><span style="color: #333333;">);</span><br /> JavaDStream<span style="color: #333333;"><</span>String<span style="color: #333333;">></span> JsonReq2 <span style="color: #333333;">=</span> ssc<span style="color: #333333;">.</span><span style="color: #0000cc;">socketTextStream</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"bindIP"</span><span style="color: #333333;">,</span> bindport<span style="color: #333333;">,</span> StorageLevels<span style="color: #333333;">.</span><span style="color: #0000cc;">MEMORY_AND_DISK_SER</span><span style="color: #333333;">);</span><br /> ArrayList<span style="color: #333333;"><</span>JavaDStream<span style="color: #333333;"><</span>String<span style="color: #333333;">>></span> streamList <span style="color: #333333;">=</span> <span style="color: #008800; font-weight: bold;">new</span> ArrayList<span style="color: #333333;"><</span>JavaDStream<span style="color: #333333;"><</span>String<span style="color: #333333;">>>();</span><br /> streamList<span style="color: #333333;">.</span><span style="color: #0000cc;">add</span><span style="color: #333333;">(</span>JsonReq1<span style="color: #333333;">);</span><br /> JavaDStream<span style="color: #333333;"><</span>String<span style="color: #333333;">></span> UnionStream <span style="color: #333333;">=</span> ssc<span style="color: #333333;">.</span><span style="color: #0000cc;">union</span><span style="color: #333333;">(</span>JsonReq2<span style="color: #333333;">,</span> streamList<span style="color: #333333;">);</span><br /><br /> UnionStream<span style="color: #333333;">.</span><span style="color: #0000cc;">foreachRDD</span><span style="color: #333333;">(</span><span style="color: #008800; font-weight: bold;">new</span> VoidFunction<span style="color: #333333;"><</span>JavaRDD<span style="color: #333333;"><</span>String<span style="color: #333333;">>>()</span> <span style="color: #333333;">{</span><br /><br /> <span style="color: #008800; font-weight: bold;">private</span> <span style="color: #008800; font-weight: bold;">static</span> <span style="color: #008800; font-weight: bold;">final</span> <span style="color: #333399; font-weight: bold;">long</span> serialVersionUID <span style="color: #333333;">=</span> <span style="color: #0000dd; font-weight: bold;">1L</span><span style="color: #333333;">;</span><br /><br /> <span style="color: #008800; font-weight: bold;">public</span> <span style="color: #333399; font-weight: bold;">void</span> <span style="color: #0066bb; font-weight: bold;">call</span><span style="color: #333333;">(</span>JavaRDD<span style="color: #333333;"><</span>String<span style="color: #333333;">></span> rdd<span style="color: #333333;">)</span> <span style="color: #008800; font-weight: bold;">throws</span> Exception <span style="color: #333333;">{</span><br /><br /> <br /> rdd<span style="color: #333333;">.</span><span style="color: #0000cc;">foreach</span><span style="color: #333333;">(</span><span style="color: #008800; font-weight: bold;">new</span> VoidFunction<span style="color: #333333;"><</span>String<span style="color: #333333;">>()</span> <span style="color: #333333;">{</span><br /><br /> <span style="color: #008800; font-weight: bold;">private</span> <span style="color: #008800; font-weight: bold;">static</span> <span style="color: #008800; font-weight: bold;">final</span> <span style="color: #333399; font-weight: bold;">long</span> serialVersionUID <span style="color: #333333;">=</span> <span style="color: #0000dd; font-weight: bold;">1L</span><span style="color: #333333;">;</span><br /><br /> <span style="color: #008800; font-weight: bold;">public</span> <span style="color: #333399; font-weight: bold;">void</span> <span style="color: #0066bb; font-weight: bold;">call</span><span style="color: #333333;">(</span>String s<span style="color: #333333;">)</span> <span style="color: #008800; font-weight: bold;">throws</span> Exception <span style="color: #333333;">{</span><br /> System<span style="color: #333333;">.</span><span style="color: #0000cc;">out</span><span style="color: #333333;">.</span><span style="color: #0000cc;">println</span><span style="color: #333333;">(</span>s<span style="color: #333333;">);</span><br /> <span style="color: #333333;">}</span><br /><br /> <span style="color: #333333;">});</span><br /> <span style="color: #333333;">}</span><br /> <span style="color: #333333;">});</span><br /><br /> System<span style="color: #333333;">.</span><span style="color: #0000cc;">out</span><span style="color: #333333;">.</span><span style="color: #0000cc;">println</span><span style="color: #333333;">(</span>UnionStream<span style="color: #333333;">.</span><span style="color: #0000cc;">count</span><span style="color: #333333;">());</span><br /> ssc<span style="color: #333333;">.</span><span style="color: #0000cc;">start</span><span style="color: #333333;">();</span><br /> ssc<span style="color: #333333;">.</span><span style="color: #0000cc;">awaitTermination</span><span style="color: #333333;">();</span><br /> <span style="color: #333333;">}</span><br /> |
Term like bindIP and bindport will be your specific spark ip/port. To test this application you can create a basic service socket port programm which must listen for clients socket from spark executor.
spark-master-url should be the url of machine where spark master is running . spark master url generally looks like spark://machineip:port