Spark DataFrame写入HBase的常用方式

news/2024/8/26 14:07:45

Spark是目前最流行的分布式计算框架,而HBase则是在HDFS之上的列式分布式存储引擎,基于Spark做离线或者实时计算,数据结果保存在HBase中是目前很流行的做法。例如用户画像、单品画像、推荐系统等都可以用HBase作为存储媒介,供客户端使用。

因此Spark如何向HBase中写数据就成为很重要的一个环节了。本文将会介绍三种写入的方式,其中一种还在期待中,暂且官网即可...

代码在spark 2.2.0版本亲测

第一种是最简单的使用方式了,就是基于RDD的分区,由于在spark中一个partition总是存储在一个excutor上,因此可以创建一个HBase连接,提交整个partition的内容。

大致的代码是:

rdd.foreachPartition { records =>
    val config = HBaseConfiguration.create
    config.set("hbase.zookeeper.property.clientPort", "2181")
    config.set("hbase.zookeeper.quorum", "a1,a2,a3")
    val connection = ConnectionFactory.createConnection(config)
    val table = connection.getTable(TableName.valueOf("rec:user_rec"))
    
    
    val list = new java.util.ArrayList[Put]
    for(i <- 0 until 10){
        val put = new Put(Bytes.toBytes(i.toString))
        put.addColumn(Bytes.toBytes("t"), Bytes.toBytes("aaaa"), Bytes.toBytes("1111"))
        list.add(put)
    }
    
    table.put(list)
    
    table.close()
}

这样每次写的代码很多,显得不够友好,如果能跟dataframe保存parquet、csv之类的就好了。下面就看看怎么实现dataframe直接写入hbase吧!

由于这个插件是hortonworks提供的,maven的中央仓库并没有直接可下载的版本。需要用户下载源码自己编译打包,如果有maven私库,可以上传到自己的maven私库里面。具体的步骤可以参考如下:

2.1 下载源码、编译、上传

去官网github下载即可:https://github.com/hortonworks-spark/shc
可以直接按照下面的readme说明来,也可以跟着我的笔记走。

下载完成后,如果有自己的私库,可以修改shc中的distributionManagement。然后点击旁边的maven插件deploy发布工程,如果只想打成jar包,那就直接install就可以了。



2.2 引入

在pom.xml中引入:

<dependency>
    <groupId>com.hortonworks</groupId>
    <artifactId>shc-core</artifactId>
    <version>1.1.2-2.2-s_2.11-SNAPSHOT</version></dependency>

2.3

首先创建应用程序,Application.scala

object Application {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local").appName("normal").getOrCreate()
        spark.sparkContext.setLogLevel("warn")
        val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}

        val df:DataFrame = spark.createDataFrame(data)
        df.write
          .mode(SaveMode.Overwrite)
          .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
          .format("org.apache.spark.sql.execution.datasources.hbase")
          .save()
    }
    def catalog = s"""{
                   |"table":{"namespace":"rec", "name":"user_rec"},
                   |"rowkey":"key",
                   |"columns":{
                   |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                   |"col1":{"cf":"t", "col":"col1", "type":"boolean"},
                   |"col2":{"cf":"t", "col":"col2", "type":"double"},
                   |"col3":{"cf":"t", "col":"col3", "type":"float"},
                   |"col4":{"cf":"t", "col":"col4", "type":"int"},
                   |"col5":{"cf":"t", "col":"col5", "type":"bigint"},
                   |"col6":{"cf":"t", "col":"col6", "type":"smallint"},
                   |"col7":{"cf":"t", "col":"col7", "type":"string"},
                   |"col8":{"cf":"t", "col":"col8", "type":"tinyint"}
                   |}
                   |}""".stripMargin
}case class HBaseRecord(
                  col0: String,
                  col1: Boolean,
                  col2: Double,
                  col3: Float,
                  col4: Int,
                  col5: Long,
                  col6: Short,
                  col7: String,
                  col8: Byte)

object HBaseRecord
{
  def apply(i: Int, t: String): HBaseRecord = {
    val s = s"""row${"%03d".format(i)}"""
    HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,
      i,
      i.toLong,
      i.toShort,
      s"String$i: $t",
      i.toByte)
  }
}


然后再resources目录下,添加hbase-site.xml、hdfs-site.xml、core-site.xml等配置文件。主要是获取Hbase中的一些连接地址。

如果有浏览官网习惯的同学,一定会发现,HBase官网的版本已经到了3.0.0-SNAPSHOT,并且早就在2.0版本就增加了一个hbase-spark模块,使用的方法跟上面hortonworks一样,只是format的包名不同而已,猜想就是把hortonworks给拷贝过来了。

另外Hbase-spark 2.0.0-alpha4目前已经公开在maven仓库中了。

http://mvnrepository.com/artifact/org.apache.hbase/hbase-spark

不过,内部的spark版本是1.6.0,太陈旧了!!!!真心等不起了...

期待hbase-spark官方能快点提供正式版吧。

  1. hortonworks-spark/shc github:https://github.com/hortonworks-spark/shc

  2. maven仓库地址: http://mvnrepository.com/artifact/org.apache.hbase/hbase-spark

  3. Hbase spark sql/ dataframe官方文档:https://hbase.apache.org/book.html#_sparksql_dataframes

转载自:https://mp.weixin.qq.com/s/oZ59E01wmU71Zvt01pmYYw


http://www.niftyadmin.cn/n/3537926.html

相关文章

java http返回的xml格式转成json格式

package Yishikeji.Hbase.Admin.Hbase; import java.io.InputStream; import java.net.URL; import java.net.URLConnection;public class HdfsClusterTest {public static void json() throws Exception{//参数url化 // String city java.net.URLEncoder.encode("…

hadoop集群监控总结

1、取自ResourceManager REST API’s指标&#xff1b; https://hadoop.apache.org/docs/r2.8.4/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html http://<rm http address:port>/ws/v1/cluster/metrics http://**.**.***.208:8088/ws/v1/cluster/metrics 集群…

CDH5.15.0安装spark2.3

简介&#xff1a; 在我的CDH5.15.0集群中&#xff0c;默认安装的spark是1.6版本&#xff0c;这里需要将其升级为spark2.x版本。经查阅官方文档&#xff0c;发现spark1.6和2.x是可以并行安装的&#xff0c;也就是说可以不用删除默认的1.6版本&#xff0c;可以直接安装2.x版本&a…

java sqoop api 导mysql数据到hdfs

package com.example.demo.controller;import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.Sqoop; import org.apache.sqoop.tool.SqoopTool; import org.apache.commons.lang3.StringUtils; /*** 创建人 **** 创建时间 2018/12/20*/ public class sqoopte…

centos安装mysql 在线安装

yum install mysql-server -y service mysqld start chkconfig mysqld on chkconfig --list mysqld mysql 进入mysql命令行接口 mysql -u root; use mysql; select user, host, password from user; CREATE USER yishikeji% IDENTIFIED BY yishikeji; GRANT ALL PRIVILEGES ON…

Java几种常用JSON库性能比较

SON不管是在Web开发还是服务器开发中是相当常见的数据传输格式&#xff0c;一般情况我们对于JSON解析构造的性能并不需要过于关心&#xff0c;除非是在性能要求比较高的系统。 目前对于Java开源的JSON类库有很多种&#xff0c;下面我们取4个常用的JSON库进行性能测试对比&…

我司Spark迁移Hive数据到MongoDB生产案例代码

github地址&#xff1a;https://github.com/yanglin502/sparkhivetomg/tree/master/sparkhivetomg 文章转自 若泽大数据&#xff1a;https://open.weixin.qq.com/connect/oauth2/authorize?appidwxc8cfdff818e686b9&redirect_urihttp%3A%2F%2Fkf.qq.com%2Ftouch%2Fsappfa…

代码 | Spark读取mongoDB数据写入Hive普通表和分区表

版本&#xff1a; spark 2.2.0 hive 1.1.0 scala 2.11.8 hadoop-2.6.0-cdh5.7.0 jdk 1.8 MongoDB 3.6.4 一 原始数据及Hive表 MongoDB数据格式 {"_id" : ObjectId("5af65d86222b639e0c2212f3"),"id" : "1","name" …