欢迎投稿

今日深度:

spark将数据写入hbase以及从hbase读取数据,sparkhbase

spark将数据写入hbase以及从hbase读取数据,sparkhbase


1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset将RDD写入hbase

2、spark从hbase中读取数据并转化为RDD

操作方式为在eclipse本地运行spark连接到远程的hbase。

java版本:1.7.0

scala版本:2.10.4

zookeeper版本:3.4.5(禁用了hbase自带zookeeper,选择自己部署的)

hadoop版本:2.4.1

spark版本:1.6.1

hbase版本:1.2.3

集群:centos6.5_x64

将RDD写入hbase

注意点:

依赖:

将lib目录下的hadoop开头jar包、hbase开头jar包添加至classpath

此外还有lib目录下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少会提示hbase RpcRetryingCaller: Call exception不断尝试重连hbase,不报错)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目录下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能会有相同名称的类,不要导错

连接集群:

spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper:

第一种是将hbase-site.xml文件加入classpath

第二种是在HBaseConfiguration实例中设置

如果不设置,默认连接的是localhost:2181会报错:connection refused?

本文使用的是第二种方式。

hbase创建表:

虽然可以在spark应用中创建hbase表,但是不建议这样做,最好在hbase shell中创建表,spark写或读数据

使用saveAsHadoopDataset写入数据

[plain]?view plain?copy
  1. package?com.test??
  2. ??
  3. import?org.apache.hadoop.hbase.HBaseConfiguration??
  4. import?org.apache.hadoop.hbase.client.Put??
  5. import?org.apache.hadoop.hbase.io.ImmutableBytesWritable??
  6. import?org.apache.hadoop.hbase.mapred.TableOutputFormat??
  7. import?org.apache.hadoop.hbase.util.Bytes??
  8. import?org.apache.hadoop.mapred.JobConf??
  9. import?org.apache.spark.SparkConf??
  10. import?org.apache.spark.SparkContext??
  11. import?org.apache.spark.rdd.RDD.rddToPairRDDFunctions??
  12. ??
  13. object?TestHBase?{??
  14. ??
  15. ??def?main(args:?Array[String]):?Unit?=?{??
  16. ????val?sparkConf?=?new?SparkConf().setAppName("HBaseTest").setMaster("local")??
  17. ????val?sc?=?new?SparkContext(sparkConf)??
  18. ??
  19. ????val?conf?=?HBaseConfiguration.create()??
  20. ????//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置??
  21. ????conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")??
  22. ????//设置zookeeper连接端口,默认2181??
  23. ????conf.set("hbase.zookeeper.property.clientPort",?"2181")??
  24. ??
  25. ????val?tablename?=?"account"??
  26. ??????
  27. ????//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!??
  28. ????val?jobConf?=?new?JobConf(conf)??
  29. ????jobConf.setOutputFormat(classOf[TableOutputFormat])??
  30. ????jobConf.set(TableOutputFormat.OUTPUT_TABLE,?tablename)??
  31. ??????
  32. ????val?indataRDD?=?sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))??
  33. ??
  34. ??
  35. ????val?rdd?=?indataRDD.map(_.split(',')).map{arr=>{??
  36. ??????/*一个Put对象就是一行记录,在构造方法中指定主键??
  37. ???????*?所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换??
  38. ???????*?Put.add方法接收三个参数:列族,列名,数据??
  39. ???????*/??
  40. ??????val?put?=?new?Put(Bytes.toBytes(arr(0).toInt))??
  41. ??????put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))??
  42. ??????put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))??
  43. ??????//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset??
  44. ??????(new?ImmutableBytesWritable,?put)???
  45. ????}}??
  46. ??????
  47. ????rdd.saveAsHadoopDataset(jobConf)??
  48. ??????
  49. ????sc.stop()??
  50. ??}??
  51. ??
  52. }??

使用saveAsNewAPIHadoopDataset写入数据


[plain]?view plain?copy
  1. package?com.test??
  2. ??
  3. import?org.apache.hadoop.hbase.HBaseConfiguration??
  4. import?org.apache.hadoop.hbase.mapreduce.TableOutputFormat??
  5. import?org.apache.spark._??
  6. import?org.apache.hadoop.mapreduce.Job??
  7. import?org.apache.hadoop.hbase.io.ImmutableBytesWritable??
  8. import?org.apache.hadoop.hbase.client.Result??
  9. import?org.apache.hadoop.hbase.client.Put??
  10. import?org.apache.hadoop.hbase.util.Bytes??
  11. ??
  12. object?TestHBase3?{??
  13. ??
  14. ??def?main(args:?Array[String]):?Unit?=?{??
  15. ????val?sparkConf?=?new?SparkConf().setAppName("HBaseTest").setMaster("local")??
  16. ????val?sc?=?new?SparkContext(sparkConf)??
  17. ??????
  18. ????val?tablename?=?"account"??
  19. ??????
  20. ????sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")??
  21. ????sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort",?"2181")??
  22. ????sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,?tablename)??
  23. ??????
  24. ????val?job?=?new?Job(sc.hadoopConfiguration)??
  25. ????job.setOutputKeyClass(classOf[ImmutableBytesWritable])??
  26. ????job.setOutputValueClass(classOf[Result])????
  27. ????job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])????
  28. ??
  29. ????val?indataRDD?=?sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))??
  30. ????val?rdd?=?indataRDD.map(_.split(',')).map{arr=>{??
  31. ??????val?put?=?new?Put(Bytes.toBytes(arr(0)))??
  32. ??????put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))??
  33. ??????put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))??
  34. ??????(new?ImmutableBytesWritable,?put)???
  35. ????}}??
  36. ??????
  37. ????rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())??
  38. ??}??
  39. ??
  40. }??


从hbase读取数据转化成RDD

本例基于官方提供的例子

[plain]?view plain?copy
  1. package?com.test??
  2. ??
  3. import?org.apache.hadoop.hbase.{HBaseConfiguration,?HTableDescriptor,?TableName}??
  4. import?org.apache.hadoop.hbase.client.HBaseAdmin??
  5. import?org.apache.hadoop.hbase.mapreduce.TableInputFormat??
  6. import?org.apache.spark._??
  7. import?org.apache.hadoop.hbase.client.HTable??
  8. import?org.apache.hadoop.hbase.client.Put??
  9. import?org.apache.hadoop.hbase.util.Bytes??
  10. import?org.apache.hadoop.hbase.io.ImmutableBytesWritable??
  11. import?org.apache.hadoop.hbase.mapreduce.TableOutputFormat??
  12. import?org.apache.hadoop.mapred.JobConf??
  13. import?org.apache.hadoop.io._??
  14. ??
  15. object?TestHBase2?{??
  16. ??
  17. ??def?main(args:?Array[String]):?Unit?=?{??
  18. ????val?sparkConf?=?new?SparkConf().setAppName("HBaseTest").setMaster("local")??
  19. ????val?sc?=?new?SparkContext(sparkConf)??
  20. ??????
  21. ????val?tablename?=?"account"??
  22. ????val?conf?=?HBaseConfiguration.create()??
  23. ????//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置??
  24. ????conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")??
  25. ????//设置zookeeper连接端口,默认2181??
  26. ????conf.set("hbase.zookeeper.property.clientPort",?"2181")??
  27. ????conf.set(TableInputFormat.INPUT_TABLE,?tablename)??
  28. ??
  29. ????//?如果表不存在则创建表??
  30. ????val?admin?=?new?HBaseAdmin(conf)??
  31. ????if?(!admin.isTableAvailable(tablename))?{??
  32. ??????val?tableDesc?=?new?HTableDescriptor(TableName.valueOf(tablename))??
  33. ??????admin.createTable(tableDesc)??
  34. ????}??
  35. ??
  36. ????//读取数据并转化成rdd??
  37. ????val?hBaseRDD?=?sc.newAPIHadoopRDD(conf,?classOf[TableInputFormat],??
  38. ??????classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],??
  39. ??????classOf[org.apache.hadoop.hbase.client.Result])??
  40. ??
  41. ????val?count?=?hBaseRDD.count()??
  42. ????println(count)??
  43. ????hBaseRDD.foreach{case?(_,result)?=>{??
  44. ??????//获取行键??
  45. ??????val?key?=?Bytes.toString(result.getRow)??
  46. ??????//通过列族和列名获取列??
  47. ??????val?name?=?Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))??
  48. ??????val?age?=?Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))??
  49. ??????println("Row?key:"+key+"?Name:"+name+"?Age:"+age)??
  50. ????}}??
  51. ??
  52. ????sc.stop()??
  53. ????admin.close()??
  54. ??}??
  55. }??
www.htsjk.Com true http://www.htsjk.com/hbase/36040.html NewsArticle spark将数据写入hbase以及从hbase读取数据,sparkhbase 1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset将RDD写入hbase 2、spark从hbase中读取数据并转化为RDD 操作方式为在eclipse本地运行spar...
相关文章
    暂无相关文章
评论暂时关闭