编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。
成都创新互联公司是专业的清河网站建设公司,清河接单;提供成都网站设计、成都做网站,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行清河网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
原始表test1数据如下:

每个row key都有两个版本的数据,这里只显示了row key为1的数据
在hbase shell 中创建数据表:
create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、无列导入设置、无列导出设置的数据
create 'test3',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、无列导入设置、有列导出设置的数据
create 'test4',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、有列导入设置、无列导出设置的数据
create 'test5',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、无列导入设置、无列导出设置的数据
create 'test6',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、无列导入设置、有列导出设置的数据
create 'test7',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列导入设置、无列导出设置的数据
create 'test8',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列导入设置、有列导出设置的数据main函数入口:
package GeneralHBaseToHBase;
import org.apache.hadoop.util.ToolRunner;
public class DriverTest {
public static void main(String[] args) throws Exception {
// 无版本设置、无列导入设置,无列导出设置
String[] myArgs1= new String[]{
"test1", // 输入表
"test2", // 输出表
"0", // 版本大小数,如果值为0,则为默认从输入表导出最新的数据到输出表
"-1", // 列导入设置,如果为-1 ,则没有设置列导入
"-1" // 列导出设置,如果为-1,则没有设置列导出
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs1);
// 无版本设置、有列导入设置,无列导出设置
String[] myArgs2= new String[]{
"test1",
"test3",
"0",
"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
"-1"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs2);
// 无版本设置,无列导入设置,有列导出设置
String[] myArgs3= new String[]{
"test1",
"test4",
"0",
"-1",
"cf1:c1,cf1:c10,cf1:c14"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs3);
// 有版本设置,无列导入设置,无列导出设置
String[] myArgs4= new String[]{
"test1",
"test5",
"2",
"-1",
"-1"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs4);
// 有版本设置、有列导入设置,无列导出设置
String[] myArgs5= new String[]{
"test1",
"test6",
"2",
"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
"-1"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs5);
// 有版本设置、无列导入设置,有列导出设置
String[] myArgs6= new String[]{
"test1",
"test7",
"2",
"-1",
"cf1:c1,cf1:c10,cf1:c14"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs6);
// 有版本设置、有列导入设置,有列导出设置
String[] myArgs7= new String[]{
"test1",
"test8",
"2",
"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
"cf1:c1,cf1:c10,cf1:c14"
};
ToolRunner.run(HBaseDriver.getConfiguration(),
new HBaseDriver(),
myArgs7);
}
}driver:
package GeneralHBaseToHBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import util.JarUtil;
public class HBaseDriver extends Configured implements Tool{
public static String FROMTABLE=""; //导入表
public static String TOTABLE=""; //导出表
public static String SETVERSION=""; //是否设置版本
// args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable}
@Override
public int run(String[] args) throws Exception {
if(args.length!=5){
System.err.println("Usage:\n demo.job.HBaseDriver "
+ " mapper:
package GeneralHBaseToHBase; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.NavigableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HBaseToHBaseMapper extends TableMapper{ Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class); private static int versionNum = 0; private static String[] columnFromTable = null; private static String[] columnToTable = null; private static String column1 = null; private static String column2 = null; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); versionNum = Integer.parseInt(conf.get("SETVERSION", "0")); column1 = conf.get("COLUMNFROMTABLE",null); if(!(column1 == null)){ columnFromTable = column1.split(","); } column2 = conf.get("COLUMNTOTABLE",null); if(!(column2 == null)){ columnToTable = column2.split(","); } } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { context.write(key, resultToPut(key,value)); } /*** * 把key,value转换为Put * @param key * @param value * @return * @throws IOException */ private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException { HashMap fTableMap = new HashMap<>(); HashMap tTableMap = new HashMap<>(); Put put = new Put(key.get()); if(! (columnFromTable == null || columnFromTable.length == 0)){ fTableMap = getFamilyAndColumn(columnFromTable); } if(! (columnToTable == null || columnToTable.length == 0)){ tTableMap = getFamilyAndColumn(columnToTable); } if(versionNum==0){ if(fTableMap.size() == 0){ if(tTableMap.size() == 0){ for (Cell kv : value.rawCells()) { put.add(kv); // 没有设置版本,没有设置列导入,没有设置列导出 } return put; } else{ return getPut(put, value, tTableMap); // 无版本、无列导入、有列导出 } } else { if(tTableMap.size() == 0){ return getPut(put, value, fTableMap);// 无版本、有列导入、无列导出 } else { return getPut(put, value, tTableMap);// 无版本、有列导入、有列导出 } } } else{ if(fTableMap.size() == 0){ if(tTableMap.size() == 0){ return getPut1(put, value); // 有版本,无列导入,无列导出 }else{ return getPut2(put, value, tTableMap); //有版本,无列导入,有列导出 } }else{ if(tTableMap.size() == 0){ return getPut2(put,value,fTableMap);// 有版本,有列导入,无列导出 }else{ return getPut2(put,value,tTableMap); // 有版本,有列导入,有列导出 } } } } /*** * 无版本设置的情况下,对于有列导入或者列导出 * @param put * @param value * @param tableMap * @return * @throws IOException */ private Put getPut(Put put,Result value,HashMap tableMap) throws IOException{ for(Cell kv : value.rawCells()){ byte[] family = kv.getFamily(); if(tableMap.containsKey(new String(family))){ String columnStr = tableMap.get(new String(family)); ArrayList columnBy = toByte(columnStr); if(columnBy.contains(new String(kv.getQualifier()))){ put.add(kv); //没有设置版本,没有设置列导入,有设置列导出 } } } return put; } /*** * (有版本,无列导入,有列导出)或者(有版本,有列导入,无列导出) * @param put * @param value * @param tTableMap * @return */ private Put getPut2(Put put,Result value,HashMap tableMap){ NavigableMap >> map=value.getMap(); for(byte[] family:map.keySet()){ if(tableMap.containsKey(new String(family))){ String columnStr = tableMap.get(new String(family)); log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr); ArrayList columnBy = toByte(columnStr); NavigableMap > familyMap = map.get(family);//列簇作为key获取其中的列相关数据 for(byte[] column:familyMap.keySet()){ //根据列名循坏 log.info("!!!!!!!!!!!"+new String(column)); if(columnBy.contains(new String(column))){ NavigableMap valuesMap = familyMap.get(column); for(Entry s:valuesMap.entrySet()){//获取列对应的不同版本数据,默认最新的一个 System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue())); put.addColumn(family, column, s.getKey(),s.getValue()); } } } } } return put; } /*** * 有版本、无列导入、无列导出 * @param put * @param value * @return */ private Put getPut1(Put put,Result value){ NavigableMap >> map=value.getMap(); for(byte[] family:map.keySet()){ NavigableMap > familyMap = map.get(family);//列簇作为key获取其中的列相关数据 for(byte[] column:familyMap.keySet()){ //根据列名循坏 NavigableMap valuesMap = familyMap.get(column); for(Entry s:valuesMap.entrySet()){ //获取列对应的不同版本数据,默认最新的一个 put.addColumn(family, column, s.getKey(),s.getValue()); } } } return put; } // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} /*** * 得到列簇名与列名的k,v形式的map * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} * @return map => {"cf1" => "c1,c2,c10,c11,c14"} */ private static HashMap getFamilyAndColumn(String[] str){ HashMap map = new HashMap<>(); HashSet set = new HashSet<>(); for(String s : str){ set.add(s.split(":")[0]); } Object[] ob = set.toArray(); for(int i=0; i toByte(String s){ ArrayList b = new ArrayList<>(); String[] sarr = s.split(","); for(int i=0;i
程序运行完之后,在hbase shell中查看每个表,看是否数据导入正确:
test2:(无版本、无列导入设置、无列导出设置)

test3 (无版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

test4(无版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

test5(有版本、无列导入设置、无列导出设置)

test6(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

test7(有版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

test8(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持创新互联。