程序运行环境:+HDFS+HBase+Yarn
(HDFS+Yarn)集群搭建,参考: Spark on Yarn,参考: HBase集群搭建,参考:hbase表结构为:表名table,列族fam,列为col。
第一步:上代码
object inputHbase:import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.util.Bytesimport org.apache.spark.{SparkContext, SparkConf}import org.apache.hadoop.hbase._/** * Created by Chensy on 15-8-10. */object inputHbase { /** * hbase table:table col-family:fam col:col */ def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) val readFile = sc.textFile(args(0)).map(x => x.split(",")) val tableName = "table" readFile.foreachPartition{ x=> { val myConf = HBaseConfiguration.create() myConf.set("hbase.zookeeper.quorum","172.23.27.45,172.23.27.46,172.23.27.47") myConf.set("hbase.zookeeper.property.clientPort","2181") myConf.set("hbase.defaults.for.version.skip","true") myConf.set("hbase.master","172.23.27.39:60000") myConf.set("hbase.cluster.distributed","true") myConf.set("hbase.rootdir","hdfs://cdh5-test/hbase") val myTable = new HTable(myConf,TableName.valueOf(tableName)) //将自动提交关闭,如果不关闭,每写一条数据都会进行提交,是导入数据较慢的做主要因素。 myTable.setAutoFlush(false,false) //设置缓存大小,当缓存大于设置值时,hbase会自动提交。此处可自己尝试大小,一般对大数据量,设置为5M即可。 myTable.setWriteBufferSize(5*1024*1024) x.foreach{ y=> { val p = new Put(Bytes.toBytes("row"+y(0))) p.add("fam".getBytes,"col".getBytes,Bytes.toBytes("value"+y(0))) myTable.put(p) } } //每一个分片结束后都进行flushCommits(),如果不执行,当hbase最后缓存小于上面设定值时,不会进行提交,导致数据丢失。 myTable.flushCommits() } } System.exit(0) }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
第二步:打包,并传至HDFS
idea打包就不说了,inputHbase.jarhadoop fs -put inputHbase.jar /xxx/spark/streaming
- 1
- 2
- 1
- 2
第三步:添加相关jars
建个公共库,把需要用到的jar包存放一起,方便添加第四步:编写执行脚本:submit-yarn-inputHbase.sh
[root@JXQ-23-27-38 streaming]# vim submit-yarn-inputHbase.sh cd $SPARK_HOME#pwd./bin/spark-submit --name inputHbase \ --class com.wylog.hbase.inputHbase \ --master yarn-cluster \ --num-executors 8 \ --executor-memory 4g \ --executor-cores 8 \ --driver-memory 4g \ --driver-cores 4 \ --jars /root/spark/streaming/public_lib/hbase-client-0.98.6-cdh5.3.2.jar,/root/spark/streaming/public_lib/hbase-server-0.98.6-cdh5.3.2.jar,/root/spark/streaming/public_lib/hbase-protocol-0.98.6-cdh5.3.2.jar,/root/spark/streaming/public_lib/htrace-core-2.04.jar \ hdfs://cdh5-test/xxx/spark/streaming/inputHbase.jar \ hdfs://cdh5-test/data/notify-server/172.17.88.88/notify-server2_detail.log.*
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
参考资料: