运维开发网

spark-kafka-es交互

运维开发网 https://www.qedev.com 2020-04-24 20:48 出处:网络 作者:运维开发网整理
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.streaming.Seconds import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafk
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.SparkConf
//import org.elasticsearch._
import com.alibaba.fastjson.JSONObject
import com.alibaba.fastjson.JSON._
import java.text.SimpleDateFormat
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.kafka.common.TopicPartition

object stu_course_test {

    def tranTimeToLong(tm:String) :Long={
       val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
       val dt = fm.parse(tm)
       val aa = fm.format(dt)
       val tim: Long = dt.getTime()/1000
       tim
    }
    def main(args:Array[String]){
        val conf = new SparkConf().setAppName("stu_live_test5").set("es.nodes",ip).set("es.port","9200")
        val ssc = new StreamingContext(conf, Seconds(2))
        println("hello")
        val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> ip,
            "group.id" -> "test_kafka1106",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "sasl.plain.username" -> usrname,
            "sasl.plain.password" -> psw,
            "security.protocol" -> "SASL_PLAINTEXT",
            "sasl.mechanism" -> "PLAIN"
        //    "auto.offset.reset" -> "earliest",
          //  "enable.auto.commit" -> (false: java.lang.Boolean)

        );
        val tops = "topic_name"
        val topics = tops.split(",").toSet//     set offset 
        val fromOffsets = Map[TopicPartition, Long](new TopicPartition(tops,0) -> 20385338L).toMap
        val stream = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets));
        println("****************************9999");
        val lines = stream.map(record => record.value)
        val offs = stream.map(off => off.offset)
        offs.print()
        lines.print()
        lines.foreachRDD(record=>{
             val datas = record.collect()
             val count = record.count()
             if (count>0){
                 for (i <- datas){
                    val dict = parseObject(parseObject(i).get("data").toString)
                    val stu_data = new JSONObject()
                    stu_data.put("a",dict.get("a").toString.toInt)
                    stu_data.put("b",dict.get("b").toString.toInt)
                    stu_data.put("c",dict.get("c").toString)
                    stu_data.put("d",dict.get("d").toString.toInt)
                    stu_data.put("time",tranTimeToLong(dict.get("time").toString).toInt)
                    stu_data.put("e",dict.get("e").toString.toInt)
                    val query = """{"query":{"bool":{"must":[{"term":{"key":"""+stu_data.get("keyid").toString+"""}},{"term":{"status":2}}]}}}"""
                    println(query)
                    val es_result = EsSpark.esRDD(ssc.sparkContext,"index_name/all-type",query)
                    println(es_result)
                    es_result.collect().foreach(course =>{
                        stu_data.put("aa",course._2("aa").toString)
                        stu_data.put("bb",course._2("bb").toString)
                        stu_data.put("cc",course._2("cc").toString.toInt)
                        val _id = stu_data.get("aa").toString+"_"+stu_data.get("bb")+"_"+stu_data.get("cc").toString
                        stu_data.put("_id",_id)
                        val stu_data_js = stu_data.toString
                        val rdd = ssc.sparkContext.makeRDD(Seq(stu_data_js))
                        EsSpark.saveJsonToEs(rdd,"test_index_name/docs",Map("es.mapping.id" -> "_id"))
                    })
                 }
             }
        })
        println("dfsdfsdf");
        ssc.start();
        ssc.awaitTermination();

    }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消