有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。
HbaseMapper:
package com.wenbronk.hbase.hbase;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class HbaseMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); String k = split[0]; String v = split[1]; context.write(new Text(k), new Text(v)); }}
HbaseReducer
package com.wenbronk.hbase.hbase;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.mapreduce.Reducer;import javax.xml.soap.Text;import java.io.IOException; /** *
继承 TableReducer, 因此 Hbase中的key是ImmutableBytesWritable */ public class HbaseReducer extends TableReducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String k = key.toString(); StringBuilder sb = new StringBuilder(); for (Text value : values) { sb.append(value.toString()).append(","); } if (sb.length() > 0) { sb.deleteCharAt(sb.length() - 1); } // rowkey Put put = new Put(k.getBytes()); put.addColumn("cf1".getBytes(), "name".getBytes(), sb.toString().getBytes()); } }
job
package com.wenbronk.hbase.hbase;import com.wenbronk.hbase.mapreduce.ReducerClass;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import java.io.IOException;public class JobTest { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration config = new Configuration(); config.set("fs.defaultFS", "hdfs://192.168.208.106:8020"); config.set("yarn.resourcemanager.hostname", "192.168.208.106"); config.set("mapred.job.tracker", "192.168.208.106:9001"); config.set("ha.zookeeper.quorum", "192.168.208.106,192.168.208.107,192.168.208.108"); Job job = new Job(config, "Hbase"); job.setJarByClass(JobTest.class); FileSystem fileSystem = FileSystem.get(config); Path inPath = new Path("/usr/test/test.txt"); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(HbaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); TableMapReduceUtil.initTableReducerJob("t_user", ReducerClass.class, job, null, null, null, null, false); boolean b = job.waitForCompletion(true); if (b) { System.out.println("mapreduce 执行成功"); } }}
系列来自尚学堂视频