运维开发网

spark-kafka-es交互 优化

运维开发网 https://www.qedev.com 2020-04-26 15:29 出处:网络 作者:运维开发网整理
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.streaming.Seconds import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafk

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.spark.streaming.Seconds

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.SparkConf

//import org.elasticsearch._

import com.alibaba.fastjson.JSONObject

import com.alibaba.fastjson.JSON._

import com.alibaba.fastjson.parser._

import java.text.SimpleDateFormat

import org.elasticsearch.spark.rdd.EsSpark

import org.apache.kafka.common.TopicPartition

import redis.clients.jedis._

import scala.collection.JavaConverters._

object stu_course_test1 {

def tranTimeToLong(tm:String) :Long={

val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

val dt = fm.parse(tm)

val aa = fm.format(dt)

val tim: Long = dt.getTime()/1000

tim

}

def main(args:Array[String]){

ParserConfig.getGlobalInstance().setAutoTypeSupport(true);

val conf = new SparkConf().setAppName("test").set("es.nodes",ip_list).set("es.port","9200")

val ssc = new StreamingContext(conf, Seconds(2))

println("hello")

val redis1 = new Jedis("10.10.66.163",6379)

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> ip,

"group.id" -> "test",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"sasl.plain.username" -> "name",

"sasl.plain.password" -> "psw",

"security.protocol" -> "SASL_PLAINTEXT",

"sasl.mechanism" -> "PLAIN",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (true: java.lang.Boolean)

);

val tops = "stucourse_xes_student_courses"

val topics = Array(tops)

val redis_ans = redis1.hgetAll(tops).asScala

var fromOffsets:Map[TopicPartition, Long] = Map()

if (redis_ans.isEmpty != true){

redis_ans.foreach{i => {fromOffsets+=(new TopicPartition(tops,i._1.toInt) -> i._2.toLong)}}

}

fromOffsets = Map[TopicPartition, Long](new TopicPartition(tops,0) -> 20900000L).toMap

//redis有值

val stream = if(fromOffsets.isEmpty != true){

KafkaUtils.createDirectStream[String, String](

ssc,

PreferConsistent,

Subscribe[String, String](topics, kafkaParams, fromOffsets));//fromOffsets.keys.toList

}

else{

KafkaUtils.createDirectStream[String, String](

ssc,

PreferConsistent,

Subscribe[String, String](topics, kafkaParams))

}

val origin = stream.map(record=>{

var new_record = new JSONObject()

new_record.put("offset",record.offset.toString.toLong);

new_record.put("partition",record.partition.toString.toInt);

new_record.put("value",record.value)

new_record

})

val offs = stream.map(off => off.offset)

origin.foreachRDD(record=>{

val count = record.count()

var data_bulk: List[String] = List()

var course_list:List[Any] = List()

println(count)

val record_ans = record.saveAsTextFile("test_log")

println(record_ans)

if (count>0){

for (i <- record.collect()){

val datas = parseObject(i.toString)

val offset = datas.get("offset")

val partition = datas.get("partition")

val dict = parseObject(parseObject(datas.get("value").toString).get("data").toString)

// println(offset,partition)

val stu_data = new JSONObject()

stu_data.put("a",dict.get("a").toString.toInt)

stu_data.put("b",dict.get("b").toString.toInt)

if (course_list.exists(tmp=>tmp==stu_data.get("a")) == false){

course_list = course_list:+stu_data.get("a")

}

if (dict.get("a").toString.toInt > 100000){

stu_data.put("a",dict.get("a").toString)

stu_data.put("b",dict.get("b").toString.toInt)

stu_data.put("c",tranTimeToLong(dict.get("c").toString).toInt)

stu_data.put("d",dict.get("d").toString)

stu_data.put("e",dict.get("e").toString)

stu_data.put("d","")

stu_data.put("d","")

stu_data.put("f",0)

stu_data.put("modify_time",System.currentTimeMillis/1000)

stu_data.put("r",dict.get("r").toString.toInt)

stu_data.put("offset",offset)

stu_data.put("partition",partition)

stu_data.put("_id",stu_data.get("a").toString+"_"+stu_data.get("a")+"_"+stu_data.get("d").toString)

data_bulk = data_bulk:+stu_data.toString

}

}

val course_data = new JSONObject()

val course_num = course_list.length

var course_cnt = ((course_num*1.0)/1000.0).ceil.toInt

if(course_cnt == 0 && course_num>0){

course_cnt = 1

}

for (i <- 0 to course_cnt){

var gap = 0

if (course_list.length > 1000){

gap = 1000

}

else{

gap = course_list.length

}

var coursestr = course_list.take(gap).toString()

course_list = course_list.takeRight(course_list.length - gap)

coursestr = coursestr.substring(5,coursestr.length-1)

if(coursestr.length > 0){

val query = """{"query":{"bool":{"must":[{"terms":{"course_id":["""+coursestr+"""]}}]}}}"""

println(query)

val es_result = EsSpark.esRDD(ssc.sparkContext,"index/all-type",query)

es_result.collect().foreach(course => {

var detail_set = new JSONObject()

detail_set.put("a",course._2("a").toString)

detail_set.put("b",course._2("b").toString)

detail_set.put("c",course._2("c").toString.toInt)

detail_set.put("c",course._2("c").toString.toInt)

detail_set.put("c",course._2("c").toString.toInt)

detail_set.put("c",course._2("c").toString.toInt)

detail_set.put("d",course._2("d").toString.toInt)

course_data.put(course._1.toString,detail_set)

})

}

}

var data_seq:Seq[String] = Seq()

var data_cnt = 0

if (data_bulk.length > 0){

var offset_list:Map[String,String] = Map()

for(data<-data_bulk){

val datastr = data.toString

var data_set = parseObject(datastr)

offset_list += (data_set.get("partition").toString->data_set.get("offset").toString)

data_set.remove("offset")

data_set.remove("partition")

if (course_data.containsKey(data_set.get("course_id").toString)){

var course_set = course_data.get(data_set.get("course_id").toString).toString

var all_data = datastr.toString.substring(0,datastr.length-1)+","+course_set.substring(1,course_set.length)

data_cnt += 1

data_seq = data_seq :+ all_data

if (data_cnt == 100){

val rdd = ssc.sparkContext.makeRDD(data_seq)

val up_ans = EsSpark.saveJsonToEs(rdd,"test_index/docs",Map("es.mapping.id" -> "_id"))

println("up_ans:",up_ans)

data_cnt = 0

data_seq = Nil

}

}

}

if (data_cnt >0){

val rdd = ssc.sparkContext.makeRDD(data_seq)

val up_ans = EsSpark.saveJsonToEs(rdd,"test_index/docs",Map("es.mapping.id" -> "_id"))

println("up_ans",up_ans)

data_cnt = 0

data_seq = Nil

}

if (offset_list.isEmpty != true){

val up_ans = redis1.hmset(tops,offset_list.asJava)

println(up_ans)

redis1.close

val redis_ans = redis1.hgetAll(tops)

println(redis_ans)

println(redis_ans.getClass.getSimpleName)

}

}

}

data_bulk = Nil

course_list = Nil

})

ssc.start();

ssc.awaitTermination();

}}

0

精彩评论

暂无评论...
验证码 换一张
取 消