博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
31-hadoop-hbase-mapreduce操作hbase
阅读量:5777 次
发布时间:2019-06-18

本文共 3121 字,大约阅读时间需要 10 分钟。

有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。

HbaseMapper:

package com.wenbronk.hbase.hbase;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class HbaseMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); String k = split[0]; String v = split[1]; context.write(new Text(k), new Text(v)); }}

HbaseReducer

package com.wenbronk.hbase.hbase;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.mapreduce.Reducer;import javax.xml.soap.Text;import java.io.IOException; /**  *
继承 TableReducer
, 因此 Hbase中的key是ImmutableBytesWritable */ public class HbaseReducer extends TableReducer
{   @Override   protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
    String k = key.toString();     StringBuilder sb = new StringBuilder();     for (Text value : values) {       sb.append(value.toString()).append(",");     }     if (sb.length() > 0) {       sb.deleteCharAt(sb.length() - 1);     }     // rowkey     Put put = new Put(k.getBytes());     put.addColumn("cf1".getBytes(), "name".getBytes(), sb.toString().getBytes());   } }

job

package com.wenbronk.hbase.hbase;import com.wenbronk.hbase.mapreduce.ReducerClass;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import java.io.IOException;public class JobTest {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration config = new Configuration();        config.set("fs.defaultFS", "hdfs://192.168.208.106:8020");        config.set("yarn.resourcemanager.hostname", "192.168.208.106");        config.set("mapred.job.tracker", "192.168.208.106:9001");        config.set("ha.zookeeper.quorum", "192.168.208.106,192.168.208.107,192.168.208.108");        Job job = new Job(config, "Hbase");        job.setJarByClass(JobTest.class);        FileSystem fileSystem = FileSystem.get(config);        Path inPath = new Path("/usr/test/test.txt");        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(HbaseMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);        TableMapReduceUtil.initTableReducerJob("t_user", ReducerClass.class, job, null, null, null, null, false);        boolean b = job.waitForCompletion(true);        if (b) {            System.out.println("mapreduce 执行成功");        }    }}

 

 

系列来自尚学堂视频

转载地址:http://fyuyx.baihongyu.com/

你可能感兴趣的文章
重载和重写的区别
查看>>
多进程2
查看>>
封装promise
查看>>
[C#]二维码(QR Code)生成与解析
查看>>
FAT12文件系统 引导扇区结构
查看>>
linux 查看时间
查看>>
Entity Framework 延伸系列目录
查看>>
采用MiniProfiler监控EF与.NET MVC项目(Entity Framework 延伸系列1)
查看>>
《个人-GIT使用方法》
查看>>
296. Best Meeting Point
查看>>
「PKUSC2018」神仙的游戏
查看>>
[NOI2017]蔬菜
查看>>
//conn数据库配置
查看>>
简单理解同步与异步
查看>>
noip 2013 华容道
查看>>
洛谷noip 模拟赛 day1 T1
查看>>
[置顶]java开发之基础篇2
查看>>
Jsp 内置对象
查看>>
数据中心机房设备发热量精确计算方法
查看>>
史上最简单的SpringCloud教程 | 第四篇:断路器(Hystrix)(Finchley版本)
查看>>