博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HBase读写的几种方式(二)spark篇
阅读量:4089 次
发布时间:2019-05-25

本文共 9231 字,大约阅读时间需要 30 分钟。

1. HBase读写的方式概况

主要分为:

  1. 纯Java API读写HBase的方式;
  2. Spark读写HBase的方式;
  3. Flink读写HBase的方式;
  4. HBase通过Phoenix读写的方式;

第一种方式是HBase自身提供的比较原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。

注意:

这里我们使用HBase2.1.2版本,spark2.4版本,scala-2.12版本,以下代码都是基于该版本开发的。

2. Spark上读写HBase

 Spark上读写HBase主要分为新旧两种API,另外还有批量插入HBase的,通过Phoenix操作HBase的。

2.1 spark读写HBase的新旧API

2.1.1 spark写数据到HBase

使用旧版本saveAsHadoopDataset保存数据到HBase上。

/** * saveAsHadoopDataset */def writeToHBase(): Unit ={  // 屏蔽不必要的日志显示在终端上  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)  /* spark2.0以前的写法  val conf = new SparkConf().setAppName("SparkToHBase").setMaster("local")  val sc = new SparkContext(conf)  */  val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()  val sc = sparkSession.sparkContext  val tableName = "test"  //创建HBase配置  val hbaseConf = HBaseConfiguration.create()  hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201") //设置zookeeper集群,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置  hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") //设置zookeeper连接端口,默认2181  hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)  //初始化job,设置输出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的  val jobConf = new JobConf(hbaseConf)  jobConf.setOutputFormat(classOf[TableOutputFormat])  val dataRDD = sc.makeRDD(Array("12,jack,16", "11,Lucy,15", "15,mike,17", "13,Lily,14"))  val data = dataRDD.map{ item =>      val Array(key, name, age) = item.split(",")      val rowKey = key.reverse      val put = new Put(Bytes.toBytes(rowKey))      /*一个Put对象就是一行记录,在构造方法中指定主键       * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换       * Put.addColumn 方法接收三个参数:列族,列名,数据*/      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))      (new ImmutableBytesWritable(), put)  }  //保存到HBase表  data.saveAsHadoopDataset(jobConf)  sparkSession.stop()}

 使用新版本saveAsNewAPIHadoopDataset保存数据到HBase上

a.txt文件内容为:

100,hello,20101,nice,24102,beautiful,26
/** * saveAsNewAPIHadoopDataset */ def writeToHBaseNewAPI(): Unit ={   // 屏蔽不必要的日志显示在终端上   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)   val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()   val sc = sparkSession.sparkContext   val tableName = "test"   val hbaseConf = HBaseConfiguration.create()   hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")   hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")   hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)   val jobConf = new JobConf(hbaseConf)   //设置job的输出格式   val job = Job.getInstance(jobConf)   job.setOutputKeyClass(classOf[ImmutableBytesWritable])   job.setOutputValueClass(classOf[Result])   job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])   val input = sc.textFile("v2120/a.txt")   val data = input.map{item =>   val Array(key, name, age) = item.split(",")   val rowKey = key.reverse   val put = new Put(Bytes.toBytes(rowKey))   put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))   put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))   (new ImmutableBytesWritable, put)   }   //保存到HBase表   data.saveAsNewAPIHadoopDataset(job.getConfiguration)   sparkSession.stop()}

2.1.2 spark从HBase读取数据

使用newAPIHadoopRDD从hbase中读取数据,可以通过scan过滤数据

/** * scan */ def readFromHBaseWithHBaseNewAPIScan(): Unit ={   //屏蔽不必要的日志显示在终端上   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)   val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local").getOrCreate()   val sc = sparkSession.sparkContext   val tableName = "test"   val hbaseConf = HBaseConfiguration.create()   hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")   hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")   hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName)   val scan = new Scan()   scan.addFamily(Bytes.toBytes("cf1"))   val proto = ProtobufUtil.toScan(scan)   val scanToString = new String(Base64.getEncoder.encode(proto.toByteArray))   hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, scanToString)   //读取数据并转化成rdd TableInputFormat是org.apache.hadoop.hbase.mapreduce包下的   val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])   val dataRDD = hbaseRDD     .map(x => x._2)     .map{result =>       (result.getRow, result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("name")), result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("age")))     }.map(row => (new String(row._1), new String(row._2), new String(row._3)))     .collect()     .foreach(r => (println("rowKey:"+r._1 + ", name:" + r._2 + ", age:" + r._3)))}

2.2 spark利用BulkLoad往HBase批量插入数据

BulkLoad原理是先利用mapreduce在hdfs上生成相应的HFlie文件,然后再把HFile文件导入到HBase中,以此来达到高效批量插入数据。

/** * 批量插入 多列 */ def insertWithBulkLoadWithMulti(): Unit ={   val sparkSession = SparkSession.builder().appName("insertWithBulkLoad").master("local[4]").getOrCreate()   val sc = sparkSession.sparkContext   val tableName = "test"   val hbaseConf = HBaseConfiguration.create()   hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")   hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")   hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)   val conn = ConnectionFactory.createConnection(hbaseConf)   val admin = conn.getAdmin   val table = conn.getTable(TableName.valueOf(tableName))   val job = Job.getInstance(hbaseConf)   //设置job的输出格式   job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])   job.setMapOutputValueClass(classOf[KeyValue])   job.setOutputFormatClass(classOf[HFileOutputFormat2])   HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf(tableName)))   val rdd = sc.textFile("v2120/a.txt")     .map(_.split(","))     .map(x => (DigestUtils.md5Hex(x(0)).substring(0, 3) + x(0), x(1), x(2)))     .sortBy(_._1)     .flatMap(x =>       {         val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]         val kv1: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(x._2 + ""))         val kv2: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(x._3 + ""))         listBuffer.append((new ImmutableBytesWritable, kv2))         listBuffer.append((new ImmutableBytesWritable, kv1))         listBuffer       }     )   //多列的排序,要按照列名字母表大小来      isFileExist("hdfs://node1:9000/test", sc)   rdd.saveAsNewAPIHadoopFile("hdfs://node1:9000/test", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)   val bulkLoader = new LoadIncrementalHFiles(hbaseConf)   bulkLoader.doBulkLoad(new Path("hdfs://node1:9000/test"), admin, table, conn.getRegionLocator(TableName.valueOf(tableName)))}/** * 判断hdfs上文件是否存在,存在则删除 */def isFileExist(filePath: String, sc: SparkContext): Unit ={  val output = new Path(filePath)  val hdfs = FileSystem.get(new URI(filePath), new Configuration)  if (hdfs.exists(output)){    hdfs.delete(output, true)  }}

2.3 spark利用Phoenix往HBase读写数据

利用Phoenix,就如同msyql等关系型数据库的写法,需要写jdbc

def readFromHBaseWithPhoenix: Unit ={   //屏蔽不必要的日志显示在终端上   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)   val sparkSession = SparkSession.builder().appName("SparkHBaseDataFrame").master("local[4]").getOrCreate()   //表小写,需要加双引号,否则报错   val dbTable = "\"test\""   //spark 读取 phoenix 返回 DataFrame的第一种方式   val rdf = sparkSession.read     .format("jdbc")     .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")     .option("url", "jdbc:phoenix:192.168.187.201:2181")     .option("dbtable", dbTable)     .load()   val rdfList = rdf.collect()   for (i <- rdfList){     println(i.getString(0) + " " + i.getString(1) + " " + i.getString(2))   }   rdf.printSchema()   //spark 读取 phoenix 返回 DataFrame的第二种方式   val df = sparkSession.read     .format("org.apache.phoenix.spark")     .options(Map("table" -> dbTable, "zkUrl" -> "192.168.187.201:2181"))     .load()   df.printSchema()   val dfList = df.collect()   for (i <- dfList){      println(i.getString(0) + " " + i.getString(1) + " " + i.getString(2))   }   //spark DataFrame 写入 phoenix,需要先建好表   /*df.write     .format("org.apache.phoenix.spark")     .mode(SaveMode.Overwrite)     .options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> "jdbc:phoenix:192.168.187.201:2181"))     .save()*/   sparkSession.stop()}

3. 总结

 可以查看纯Java API读写HBase

 可以查看flink读写HBase

github地址:

参考资料:

https://my.oschina.net/uchihamadara/blog/2032481

https://www.cnblogs.com/simple-focus/p/6879971.html

https://www.cnblogs.com/MOBIN/p/5559575.html

https://blog.csdn.net/Suubyy/article/details/80892023

https://www.jianshu.com/p/b09283b14d84

https://www.jianshu.com/p/8e3fdf70dc06

https://www.cnblogs.com/wumingcong/p/6044038.html

https://blog.csdn.net/zhuyu_deng/article/details/43192271

https://www.jianshu.com/p/4c908e419b60

https://blog.csdn.net/Colton_Null/article/details/83387995

 

 

你可能感兴趣的文章
AI 入门怎么学?这份学习指南请收好!
查看>>
编程知识比拼第十四场丨编程考试
查看>>
1000道互联网大厂面试题:ZooKeeper+Dubbo+Spring+MySQL等|(含答案)
查看>>
面试必备技能:大厂面试题+架构视频+BATJ面试攻略+简历模板免费领取!
查看>>
编程知识比拼第十五场丨编程考试
查看>>
MySQL登顶数据库No.1!凭什么?
查看>>
编程知识比拼第十六场丨编程考试
查看>>
想提升Python的实战开发能力,程序员可以这么做?
查看>>
震惊!它竟然霸占技能排行榜前5,到底该不该入局??
查看>>
崩溃!程序员太累了!
查看>>
编程知识比拼第十七场丨编程考试
查看>>
编程知识比拼第十八场丨编程考试
查看>>
Spring Boot秒杀系统太难了?那是你不知道这3个要点!
查看>>
无套路、无噱头,只送惊喜!
查看>>
神操作!一行代码搞定一款游戏?厉害了程序员!
查看>>
Python太火了,这招教你3天入门!
查看>>
BAT架构师技术文档:Redis+Nginx+Dubbo精选+面试题+架构师精选视频(免费领)
查看>>
编程知识比拼第十九场丨编程考试
查看>>
编程知识比拼第二十场丨编程考试
查看>>
编程知识比拼第二十一场丨编程考试
查看>>