HBase的介绍及使用场景;Spark on HBase介绍;以及如何通过Spark的DataFrame访问HBase表。

一.    HBase简介

HBase的介绍在网上随处可见,借用《HBase企业应用实战》中的描述,如下:

“HBase(Hadoop Database)是一个高可靠、高性能、面向列、可伸缩的分布式数据库,利用HBase技术可在廉价PC上搭建起大规模结构化存储集群。HBase参考Google的BigTable建模,使用类似GFS的HDFS作为底层文件存储系统,在其上可以运行MapReduce批量处理数据,使用ZooKeeper作为协同服务组件。

HBase的整个项目使用Java语言实现,它是Apache基金会的Hadoop项目的一部分,既是模仿Google BigTable的开源产品,同时又是Hadoop的衍生产品。而Hadoop作为批量离线计算系统已经得到了业界的普遍认可,并经过了工业上的验证,所以HBase具备“站在巨人肩膀之上”的优势,其发展势头非常迅猛。

HBase还是一种非关系型数据库,即NoSQL数据库。在Eric Brewer的CAP理论中,HBase属于CP类型的系统,其NoSQL的特性非常明显,这些特性也决定了其独特的应用场景。接下来的内容将详细讲解HBase的发展历史、发行版本和特性”

CAP

二.    HBase的选型依据

2.1   使用场景

《HBase企业应用实战》中提到HBase较为适合如下几种使用需求,笔者加入相应自己的理解,描述如下:

具体使用场景如用户画像、实时指标计算、更新计算和增量计算,HBase都比较适合。

那么,哪些场景是不适合HBase的,主要有如下两个场景:

以上两个场景,其实正是Spark+HDFS+Hive的强项。Spark+HDFS+Hive应对离线批处理和复杂统计计算场景,HBase应对准实时的并发查询和简单计算场景,也是实践中比较好的方式。

2.2   性能

网上已有许多不同的NoSql数据库之间的性能比较:

2016-11:Performance Comparison between Five NoSQL Databases

2014-09:性能测试:SequoiaDB vs. MongoDB vs. Cassandra vs. HBase

2014-08:datastax的评测

2014-02:Sergey Sverchkov的评测

评测的结果都差不多,HBase在NoSQL中属于较为中庸的,各种场景都适合,可以作为Spark+Hadoop生态系统的Key-Value数据库选择。当然,选择其他类型的KV数据库,可能可以获得某方面的极致性能,同时也要考虑学习成本和易用性。

2.3   生态系统兼容

HBase基于Hadoop系统构建,安装极为简单。Spark有多种方式操作HBase,接下来会介绍HORTONWORKS开源出来的SPARK-ON-HBASE: DATAFRAME BASED HBASE CONNECTOR。在Hive中,我们可以使用HQL语句在HBase表上进行查询、插入操作,同时可以将HBase中的数据导出至Hive进行离线批处理分析,详见HBaseIntegrationLanguageManual ImportExport

图 HBase架构

三.    SHC:Spark on HBase简介

SHC是hortonworks给Spark做的一个HBase的Connector,其优势是可以通过用DataFrame的API直接对HBase进行读写操作,相对Spark原生的API(saveAsHadoopDataset和saveAsNewAPIHadoopDataset)而言,极大增强了易用性,Github上的地址如下shc

详细的介绍可以看SPARK-ON-HBASE: DATAFRAME BASED HBASE CONNECTOR,想要一目十行看中文的同学可以移步相对应的译文Spark-on-HBase:通过Spark的DataFrame访问HBase表

四.    SHC+DataFrame便捷使用

使用SHC+DataFrame非常简单,基本只需要关注HBaseTableCatalog,即定义出DataFrame的每一列应该输出到HBase中的表名、RowKey,列族、列名、类型以及命名空间与编码格式。该格式可以从JSON数据读入,具体格式如下,详细样例请移步官方基础样例

  1. def catalog = s”””{
  2. |”table”:{“namespace”:”default”, “name”:”shcExampleTable”, “tableCoder”:”PrimitiveType”},
  3. |”rowkey”:”key1:key2″,
  4. |”columns”:{
  5. |”col00″:{“cf”:”rowkey”, “col”:”key1″, “type”:”string”, “length”:”6″},
  6. |”col01″:{“cf”:”rowkey”, “col”:”key2″, “type”:”int”},
  7. |”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
  8. |”col2″:{“cf”:”cf2″, “col”:”col2″, “type”:”double”},
  9. |”col3″:{“cf”:”cf3″, “col”:”col3″, “type”:”float”},
  10. |”col4″:{“cf”:”cf4″, “col”:”col4″, “type”:”int”},
  11. |”col5″:{“cf”:”cf5″, “col”:”col5″, “type”:”bigint”},
  12. |”col6″:{“cf”:”cf6″, “col”:”col6″, “type”:”smallint”},
  13. |”col7″:{“cf”:”cf7″, “col”:”col7″, “type”:”string”},
  14. |”col8″:{“cf”:”cf8″, “col”:”col8″, “type”:”tinyint”}
  15. |}
  16. |}”””.stripMargin

 

HBase中跨列族访问是非常低效的,所以在实际应用的过程中,往往需要进行存储的DataFrame中列都是一个列族的。同时有些时候DataFrame会产生不固定的列,频繁手动更改HBaseTableCatalog的Schema也是一个无趣的活。DataFrame的优势在这个时候就体现出来了,我们可以根据DataFrame的Schema自动化生成HBase的映射关系以及列值类型,其余的自己指定就好。代码如下:

  1. importapache.spark.sql.execution.datasources.HBase._
  2. importapache.spark.sql._
  3. importapache.spark.sql.types._
  4. importapache.spark.sql.functions._
  5. importapache.spark.{SparkConf, SparkContext}
  6. importalibaba.fastjson.{JSON, JSONArray, JSONObject}
  7. importcollection.JavaConversions._
  8. importcollection.JavaConverters._
  9. /**
  10. * 保存DataFrame到HBase
  11. * @param df_2_be_save 需要保存的DataFrame
  12. * @param tablename_hb 需要存入的表名,需要提前建好
  13. * @param namespace_hb default,根据需要更改,一般默认就好
  14. * @param tableCoder_hb PrimitiveType,根据需要更改,一般默认就好
  15. * @param rowkey_hb 主键名字
  16. * @param columFamily_hb 列族名,在该函数中,DataFrame的所有列都只能插入到同一个列族中,列族需要预先定义好
  17. * @param newtablecount
  18. */
  19. def DataFrame_Save_To_HBase(df_2_be_save:DataFrame)(tablename_hb:String, namespace_hb:String = “default”,tableCoder_hb:String = “PrimitiveType”,rowkey_hb:String,columFamily_hb:String,newtablecount:Int = 5): Unit ={
  20. val column_json = df_2_be_save.schema.map{
  21. stf =>{
  22. if(stf.name==rowkey_hb) {
  23. (stf.name -> JSON.toJSON(Map(“cf”-> “rowkey”,
  24. “col”-> rowkey_hb,
  25. “type”-> stf.dataType.simpleString).asJava))
  26. }else{
  27. (stf.name -> JSON.toJSON(Map(“cf”-> columFamily_hb,
  28. “col”-> stf.name,
  29. “type”-> stf.dataType.simpleString).asJava))
  30. }
  31. }
  32. }.toMap.asJava
  33. val df_HBase_schema = JSON.toJSON(Map(“table”->JSON.toJSON(Map(“name”->tablename_hb,
  34. “namespace”->namespace_hb,”tableCoder”->tableCoder_hb).asJava),
  35. “rowkey”->rowkey_hb,
  36. “columns”->column_json).asJava)
  37. //    println(df_HBase_schema)
  38. options(
  39. Map(HBaseTableCatalog.tableCatalog -> df_HBase_schema.toString, HBaseTableCatalog.newTable -> newtablecount.toString)).
  40. format(“org.apache.spark.sql.execution.datasources.HBase”).
  41. save()
  42. }

至此,诸位读者应该可以拿着SHC和Spark愉快的玩耍了。周末愉快。

点击查看更多spark内容!

 

源链接

Hacking more

...