运维开发网

scala – 构建一个火花流示例应用程序

运维开发网 https://www.qedev.com 2020-07-26 09:50 出处:网络 作者:运维开发网整理
我正在尝试构建一个简单的spark作业,它从kafka集群读取,计算Hbase中的单词和存储. 我使用的代码基于此处的示例: Importing data in Hbase using Spark and Kafka 这是scala代码: package org.example.main import java.util.Properties import org.apache.hadoop.h
我正在尝试构建一个简单的spark作业,它从kafka集群读取,计算Hbase中的单词和存储.

我使用的代码基于此处的示例:

Importing data in Hbase using Spark and Kafka

这是scala代码:

package org.example.main

import java.util.Properties
import org.apache.hadoop.hbase.{ HBaseConfiguration, HColumnDescriptor, HTableDescriptor }
import org.apache.hadoop.hbase.client.{ HBaseAdmin, Put }
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{ PairRDDFunctions, RDD }
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

object scalaConsumer {
  def main(args : Array[String]) {

    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "localhost:2181")

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable("testTable")) {
      val tableDesc = new HTableDescriptor("testTable")
      tableDesc.addFamily(new HColumnDescriptor("metric"))
      admin.createTable(tableDesc)
    }

    // setup streaming context
    val ssc = new StreamingContext("master", "MetricAggregatorTest", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
    ssc.checkpoint("checkpoint")
    val topics = "test"
    val numThreads = 2
    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val zkQuorum = "localhost:2181"
    val lines = KafkaUtils.createStream(ssc, zkQuorum, "consumer-group", topicpMap)
      .map { case (key, value) => ((key, Math.floor(System.currentTimeMillis() / 60000).toLong * 60), value.toInt) }

    val aggr = lines.reduceByKeyAndWindow(add _, Minutes(1), Minutes(1), 2)
    val tableName = "testTable"
    aggr.foreach(line => saveToHBase(line, zkQuorum, tableName))

    ssc.start

    ssc.awaitTermination
  }

  def add(a : Int, b : Int) = { (a + b) }

  def saveToHBase(rdd : RDD[((String, Long), Int)], zkQuorum : String, tableName : String) = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", zkQuorum)

    val jobConfig = new JobConf(conf)
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    jobConfig.setOutputFormat(classOf[TableOutputFormat])

    new PairRDDFunctions(rdd.map { case ((metricId, timestamp), value) => createHBaseRow(metricId, timestamp, value) }).saveAsHadoopDataset(jobConfig)
  }

  def createHBaseRow(metricId : String, timestamp : Long, value : Int) = {
    val record = new Put(Bytes.toBytes(metricId + "~" + timestamp))

    record.add(Bytes.toBytes("metric"), Bytes.toBytes("col"), Bytes.toBytes(value.toString))

    (new ImmutableBytesWritable, record)
  }

}

和pom.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.test.scalConsumer</groupId>
    <artifactId>scalConsumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>"Spark Test"</name>

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
        <repository>
            <id>maven-hadoop</id>
            <name>Hadoop Releases</name>
            <url>https://repository.cloudera.com/content/repositories/releases/</url>
        </repository>
        <repository>
            <id>cloudera-repos</id>
            <name>Cloudera Repos</name>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>org.example.main.scalaConsumer</mainClass>
                        </manifest>
                    </archive>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>  
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
        </dependency>
        <dependency>
        <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>0.90.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>0.98.5-hadoop2</version>
        </dependency>
        <dependency>
                 <groupId>org.apache.hbase</groupId>
                 <artifactId>hbase-client</artifactId>
                 <version>0.98.5-hadoop2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.0.0-cdh5.1.0</version>
        </dependency>
    </dependencies>
</project>

我正在使用maven构建jar文件:

mvn package

并使用此命令运行:

~/Desktop/spark/bin/spark-submit --class org.example.main.scalaConsumer scalConsumer-0.0.1-SNAPSHOT.jar

我假设的错误是由于不匹配的版本(第一次使用maven和scala)导致的链接错误是这样的:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration

从搜索我已经看到这是一个常见的事件,但我没有找到解决方案.我错过了我的依赖项中的某些内容吗?

也许这是在你的pom文件中做到这一点:

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.90.3</version>
    </dependency>

你指的是版本0.90.3,而在所有其他情况下你指的是0.98.5-hadoop:

<dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>0.98.5-hadoop2</version>
    </dependency>

0.90实际上是HBase(2011)的非常旧的版本!

扫码领视频副本.gif

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号