运维开发网

kafka high-level consumer 多线程访问异常

运维开发网 https://www.qedev.com 2020-03-19 10:23 出处:网络 作者:运维开发网整理
在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。     Java代码    def hasNext(): Boolean = {       if(state == FAILED)         //处于FAILED状态时

在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。

 

 

Java代码  

kafka high-level consumer 多线程访问异常

  1.  def hasNext(): Boolean = {  

  2.     if(state == FAILED)         //处于FAILED状态时,另外线程访问会直接异常  

  3.       throw new IllegalStateException("Iterator is in failed state")  

  4.     state match {  

  5.       case DONE => false  

  6.       case READY => true  

  7.       case _ => maybeComputeNext()  

  8.     }  

  9.   }  

  10.   

  11.   

  12.   def maybeComputeNext(): Boolean = {  

  13.     state = FAILED              //重置了状态  

  14.     nextItem = Some(makeNext())          

  15.     if(state == DONE) {  

  16.       false  

  17.     } else {  

  18.       state = READY  

  19.       true  

  20.     }  

  21.   }  

  22.   下载

  23.   

  24. protected def makeNext(): MessageAndMetadata[K, V] = {  

  25.     var currentDataChunk: FetchedDataChunk = null  

  26.     // if we don't have an iterator, get one  

  27.     var localCurrent = current.get()  

  28.     if(localCurrent == null || !localCurrent.hasNext) {  

  29.       if (consumerTimeoutMs < 0)  

  30.         currentDataChunk = channel.take             //channel是BlockingQueue这里会阻塞  

  31.   

  32.       else {  

  33.         currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)  

  34.         if (currentDataChunk == null) {  

  35.           // reset state to make the iterator re-iterable  

  36.           resetState()  

  37.           throw new ConsumerTimeoutException  

  38.         }  

  39.       }  

  40. //省略部分代码  

  41. }  

0

精彩评论

暂无评论...
验证码 换一张
取 消