Skip to main content

1. Chemin d'apprentissage HBASE (4) Activité HBASE API

1, Construction environnementale Eclipse

La méthode d'introduction d'un pot spécifique peut être mentionnée sur http: //www.cnblog.com /qingyununzong/p/8623309.html

2, Table d'activité de HBASE API


1 import java.io.IOException; 2 import java.util.Date; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.hbase.HBaseConfiguration; 6 import org.apache.hadoop.hbase.HColumnDescriptor; 7 import org.apache.hadoop.hbase.HTableDescriptor; 8 import org.apache.hadoop.hbase.TableName; 9 import org.apache.hadoop.hbase.client.Admin; 10 import org.apache.hadoop.hbase.client.Connection; 11 import org.apache.hadoop.hbase.client.ConnectionFactory; 12 import org.apache.hadoop.hbase.client.Delete; 13 import org.apache.hadoop.hbase.client.Get; 14 import org.apache.hadoop.hbase.client.Put; 15 import org.apache.hadoop.hbase.client.Result; 16 import org.apache.hadoop.hbase.client.ResultScanner; 17 import org.apache.hadoop.hbase.client.Scan; 18 import org.apache.hadoop.hbase.client.Table; 19 20 import com.study.hbase.service.HBaseUtils; 21 22 public class HBaseUtilsImpl implements HBaseUtils { 23 24 private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; 25 private static final String ZK_CONNECT_VALUE = "hadoop1:2181,hadoop2:2181,hadoop3:2181"; 26 27 private static Connection conn = null; 28 private static Admin admin = null; 29 30 public static void main(String[] args) throws Exception { 31 32 getConnection(); 33 getAdmin(); 34 35 HBaseUtilsImpl hbu = new HBaseUtilsImpl(); 36 37 38 //hbu.getAllTables(); 39 40 //hbu.descTable("people"); 41 42 //String[] infos = {"info","family"}; 43 //hbu.createTable("people", infos); 44 45 //String[] add = {"cs1","cs2"}; 46 //String[] remove = {"cf1","cf2"}; 47 48 //HColumnDescriptor hc = new HColumnDescriptor("sixsixsix"); 49 50 //hbu.modifyTable("stu",hc); 51 //hbu.getAllTables(); 52 53 54 hbu.putData("huoying", "rk001", "cs2", "name", "aobama",new Date().getTime()); 55 hbu.getAllTables(); 56 57 conn.close(); 58 } 59 60 // 获取连接 61 public static Connection getConnection() { 62 // 创建一个可以用来管理hbase配置信息的conf对象 63 Configuration conf = HBaseConfiguration.create(); 64 // 设置当前的程序去寻找的hbase在哪里 65 conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); 66 try { 67 conn = ConnectionFactory.createConnection(conf); 68 } catch (IOException e) { 69 e.printStackTrace(); 70 } 71 return conn; 72 } 73 74 // 获取管理员对象 75 public static Admin getAdmin() { 76 try { 77 admin = conn.getAdmin(); 78 } catch (IOException e) { 79 e.printStackTrace(); 80 } 81 return admin; 82 } 83 84 // 查询所有表 85 @Override 86 public void getAllTables() throws Exception { 87 //获取列簇的描述信息 88 HTableDescriptor[] listTables = admin.listTables(); 89 for (HTableDescriptor listTable : listTables) { 90 //转化为表名 91 String tbName = listTable.getNameAsString(); 92 //获取列的描述信息 93 HColumnDescriptor[] columnFamilies = listTable.getColumnFamilies(); 94 System.out.println("tableName:"+tbName); 95 for(HColumnDescriptor columnFamilie : columnFamilies) { 96 //获取列簇的名字 97 String columnFamilyName = columnFamilie.getNameAsString(); 98 System.out.print("\t"+"columnFamilyName:"+columnFamilyName); 99 }100 System.out.println();101 }102 103 }104 105 // 创建表,传参,表名和列簇的名字106 @Override107 public void createTable(String tableName, String[] family) throws Exception {108 109 TableName name = TableName.valueOf(tableName);110 //判断表是否存在111 if(admin.tableExists(name)) {112 System.out.println("table已经存在!");113 }else {114 //表的列簇示例115 HTableDescriptor htd = new HTableDescriptor(name);116 //向列簇中添加列的信息117 for(String str : family) {118 HColumnDescriptor hcd = new HColumnDescriptor(str);119 htd.addFamily(hcd);120 }121 //创建表122 admin.createTable(htd); //判断表是否创建成功124 if(admin.tableExists(name)) {125 System.out.println("table创建成功");126 }else {127 System.out.println("table创建失败");128 }129 } 130 131 }132 133 // 创建表,传参:封装好的多个列簇134 @Override135 public void createTable(HTableDescriptor htds) throws Exception {136 //获得表的名字137 String tbName = htds.getNameAsString();138 139 admin.createTable(htds);140 }141 142 // 创建表,传参,表名和封装好的多个列簇143 @Override144 public void createTable(String tableName, HTableDescriptor htds) throws Exception {145 146 TableName name = TableName.valueOf(tableName);147 148 if(admin.tableExists(name)) {149 System.out.println("table已经存在!");150 }else {151 admin.createTable(htds);152 boolean flag = admin.tableExists(name);153 System.out.println(flag ? "创建成功" : "创建失败");154 }155 156 }157 158 159 // 查看表的列簇属性160 @Override161 public void descTable(String tableName) throws Exception {162 //转化为表名163 TableName name = TableName.valueOf(tableName);164 //判断表是否存在165 if(admin.tableExists(name)) {166 //获取表中列簇的描述信息167 HTableDescriptor tableDescriptor = admin.getTableDescriptor(name);168 //获取列簇中列的信息169 HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();170 for(HColumnDescriptor columnFamily : columnFamilies) {171 System.out.println(columnFamily);172 }173 174 }else {175 System.out.println("table不存在");176 }177 178 }179 180 // 判断表存在不存在181 @Override182 public boolean existTable(String tableName) throws Exception {183 TableName name = TableName.valueOf(tableName);184 return admin.tableExists(name);185 }186 187 // disable表188 @Override189 public void disableTable(String tableName) throws Exception {190 191 TableName name = TableName.valueOf(tableName);192 193 if(admin.tableExists(name)) {194 if(admin.isTableEnabled(name)) {195 admin.disableTable(name);196 }else {197 System.out.println("table不是活动状态");198 }199 }else {200 System.out.println("table不存在");201 }202 203 }204 205 // drop表206 @Override207 public void dropTable(String tableName) throws Exception {208 //转化为表名209 TableName name = TableName.valueOf(tableName);210 //判断表是否存在211 if(admin.tableExists(name)) {212 //判断表是否处于可用状态213 boolean tableEnabled = admin.isTableEnabled(name);214 215 if(tableEnabled) {216 //使表变成不可用状态217 admin.disableTable(name);218 }219 //删除表220 admin.deleteTable(name);221 //判断表是否存在222 if(admin.tableExists(name)) {223 System.out.println("删除失败");224 }else {225 System.out.println("删除成功");226 }227 228 }else {229 System.out.println("table不存在");230 } 231 232 233 }234 235 // 修改表(增加和删除)236 @Override237 public void modifyTable(String tableName) throws Exception {238 //转化为表名239 TableName name = TableName.valueOf(tableName);240 //判断表是否存在241 if(admin.tableExists(name)) {242 //判断表是否可用状态243 boolean tableEnabled = admin.isTableEnabled(name);244 245 if(tableEnabled) {246 //使表变成不可用247 admin.disableTable(name);248 }249 //根据表名得到表250 HTableDescriptor tableDescriptor = admin.getTableDescriptor(name);251 //创建列簇结构对象252 HColumnDescriptor columnFamily1 = new HColumnDescriptor("cf1".getBytes());253 HColumnDescriptor columnFamily2 = new HColumnDescriptor("cf2".getBytes());254 255 tableDescriptor.addFamily(columnFamily1);256 tableDescriptor.addFamily(columnFamily2);257 //替换该表所有的列簇258 admin.modifyTable(name, tableDescriptor);259 260 }else {261 System.out.println("table不存在");262 } 263 }264 265 // 修改表(增加和删除)266 @Override267 public void modifyTable(String tableName, String[] addColumn, String[] removeColumn) throws Exception {268 //转化为表名269 TableName name = TableName.valueOf(tableName);270 //判断表是否存在271 if(admin.tableExists(name)) {272 //判断表是否可用状态273 boolean tableEnabled = admin.isTableEnabled(name);274 275 if(tableEnabled) {276 //使表变成不可用277 admin.disableTable(name);278 }279 //根据表名得到表280 HTableDescriptor tableDescriptor = admin.getTableDescriptor(name);281 //创建列簇结构对象,添加列282 for(String add : addColumn) {283 HColumnDescriptor addColumnDescriptor = new HColumnDescriptor(add);284 tableDescriptor.addFamily(addColumnDescriptor);285 }286 //创建列簇结构对象,删除列287 for(String remove : removeColumn) {288 HColumnDescriptor removeColumnDescriptor = new HColumnDescriptor(remove);289 tableDescriptor.removeFamily(removeColumnDescriptor.getName());290 }291 292 admin.modifyTable(name, tableDescriptor);293 294 295 }else {296 System.out.println("table不存在");297 } 298 299 }300 301 @Override302 public void modifyTable(String tableName, HColumnDescriptor hcds) throws Exception {303 //转化为表名304 TableName name = TableName.valueOf(tableName);305 //根据表名得到表306 HTableDescriptor tableDescriptor = admin.getTableDescriptor(name);307 //获取表中所有的列簇信息308 HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();309 310 boolean flag = false;311 //判断参数中传入的列簇是否已经在表中存在312 for(HColumnDescriptor columnFamily : columnFamilies) {313 if(columnFamily.equals(hcds)) {314 flag = true;315 }316 } 317 //存在提示,不存在直接添加该列簇信息318 if(flag) {319 System.out.println("该列簇已经存在");320 }else {321 tableDescriptor.addFamily(hcds);322 admin.modifyTable(name, tableDescriptor);323 }324 325 }326 327 328 /**添加数据329 *tableName: 表明330 *rowKey: 行键331 *familyName:列簇332 *columnName:列名333 *value: 值334 */335 @Override336 public void putData(String tableName, String rowKey, String familyName, String columnName, String value)337 throws Exception {338 //转化为表名339 TableName name = TableName.valueOf(tableName);340 //添加数据之前先判断表是否存在,不存在的话先创建表341 if(admin.tableExists(name)) {342 343 }else {344 //根据表明创建表结构345 HTableDescriptor tableDescriptor = new HTableDescriptor(name);346 //定义列簇的名字347 HColumnDescriptor columnFamilyName = new HColumnDescriptor(familyName);348 tableDescriptor.addFamily(columnFamilyName);349 admin.createTable(tableDescriptor);350 351 }352 353 Table table = conn.getTable(name);354 Put put = new Put(rowKey.getBytes());355 356 put.addColumn(familyName.getBytes(), columnName.getBytes(), value.getBytes());357 table.put(put);358 359 }360 361 @Override362 public void putData(String tableName, String rowKey, String familyName, String columnName, String value,363 long timestamp) throws Exception {364 365 // 转化为表名366 TableName name = TableName.valueOf(tableName);367 // 添加数据之前先判断表是否存在,不存在的话先创建表368 if (admin.tableExists(name)) {369 370 } else {371 // 根据表明创建表结构372 HTableDescriptor tableDescriptor = new HTableDescriptor(name);373 // 定义列簇的名字374 HColumnDescriptor columnFamilyName = new HColumnDescriptor(familyName);375 tableDescriptor.addFamily(columnFamilyName);376 admin.createTable(tableDescriptor);377 378 }379 380 Table table = conn.getTable(name);381 Put put = new Put(rowKey.getBytes());382 383 //put.addColumn(familyName.getBytes(), columnName.getBytes(), value.getBytes());384 put.addImmutable(familyName.getBytes(), columnName.getBytes(), timestamp, value.getBytes());385 table.put(put);386 387 }388 389 390 // 根据rowkey查询数据391 @Override392 public Result getResult(String tableName, String rowKey) throws Exception {393 394 Result result;395 TableName name = TableName.valueOf(tableName);396 if(admin.tableExists(name)) {397 Table table = conn.getTable(name);398 Get get = new Get(rowKey.getBytes());399 result = table.get(get);400 401 }else {402 result = null;403 }404 405 return result;406 }407 408 // 根据rowkey查询数据409 @Override410 public Result getResult(String tableName, String rowKey, String familyName) throws Exception {411 Result result;412 TableName name = TableName.valueOf(tableName);413 if(admin.tableExists(name)) {414 Table table = conn.getTable(name);415 Get get = new Get(rowKey.getBytes());416 get.addFamily(familyName.getBytes());417 result = table.get(get);418 419 }else {420 result = null;421 }422 423 return result;424 }425 426 // 根据rowkey查询数据427 @Override428 public Result getResult(String tableName, String rowKey, String familyName, String columnName) throws Exception {429 430 Result result;431 TableName name = TableName.valueOf(tableName);432 if(admin.tableExists(name)) {433 Table table = conn.getTable(name);434 Get get = new Get(rowKey.getBytes());435 get.addColumn(familyName.getBytes(), columnName.getBytes());436 result = table.get(get);437 438 }else {439 result = null;440 }441 442 return result;443 }444 445 // 查询指定version446 @Override447 public Result getResultByVersion(String tableName, String rowKey, String familyName, String columnName,448 int versions) throws Exception {449 450 Result result;451 TableName name = TableName.valueOf(tableName);452 if(admin.tableExists(name)) {453 Table table = conn.getTable(name);454 Get get = new Get(rowKey.getBytes());455 get.addColumn(familyName.getBytes(), columnName.getBytes());456 get.setMaxVersions(versions);457 result = table.get(get);458 459 }else {460 result = null;461 }462 463 return result;464 }465 466 // scan全表数据467 @Override468 public ResultScanner getResultScann(String tableName) throws Exception {469 470 ResultScanner result;471 TableName name = TableName.valueOf(tableName);472 if(admin.tableExists(name)) {473 Table table = conn.getTable(name);474 Scan scan = new Scan();475 result = table.getScanner(scan);476 477 }else {478 result = null;479 }480 481 return result;482 }483 484 // scan全表数据485 @Override486 public ResultScanner getResultScann(String tableName, Scan scan) throws Exception {487 488 ResultScanner result;489 TableName name = TableName.valueOf(tableName);490 if(admin.tableExists(name)) {491 Table table = conn.getTable(name);492 result = table.getScanner(scan);493 494 }else {495 result = null;496 }497 498 return result;499 }500 501 // 删除数据(指定的列)502 @Override503 public void deleteColumn(String tableName, String rowKey) throws Exception {504 505 TableName name = TableName.valueOf(tableName);506 if(admin.tableExists(name)) {507 Table table = conn.getTable(name);508 Delete delete = new Delete(rowKey.getBytes());509 table.delete(delete);510 511 }else {512 System.out.println("table不存在");513 }514 515 516 }517 518 // 删除数据(指定的列)519 @Override520 public void deleteColumn(String tableName, String rowKey, String falilyName) throws Exception {521 522 TableName name = TableName.valueOf(tableName);523 if(admin.tableExists(name)) {524 Table table = conn.getTable(name);525 Delete delete = new Delete(rowKey.getBytes());526 delete.addFamily(falilyName.getBytes());527 table.delete(delete);528 529 }else {530 System.out.println("table不存在");531 }532 533 }534 535 // 删除数据(指定的列)536 @Override537 public void deleteColumn(String tableName, String rowKey, String falilyName, String columnName) throws Exception {538 TableName name = TableName.valueOf(tableName);539 if(admin.tableExists(name)) {540 Table table = conn.getTable(name);541 Delete delete = new Delete(rowKey.getBytes());542 delete.addColumn(falilyName.getBytes(), columnName.getBytes());543 table.delete(delete);544 545 }else {546 System.out.println("table不存在");547 }548 }549 550 } Lundi, route de l'apprentissage HBASE (5) Mapreduce Hbase
Mapreduce Reads Données de HDFS à HBASE
Il existe un fichier étudiant.txt dans HDF existant, le format suivant
95002,刘晨,女,19,IS95017,王风娟,女,18,IS95018,王一,女,19,IS95013,冯伟,男,21,CS95014,王小丽,女,19,CS95019,邢小丽,女,19,IS95020,赵钱,男,21,IS95003,王敏,女,22,MA95004,张立,男,19,IS95012,孙花,女,20,CS95010,孔小涛,男,19,CS95005,刘刚,男,18,MA95006,孙庆,男,23,CS95007,易思玲,女,19,MA95008,李娜,女,18,CS95021,周二,男,17,MA95022,郑明,男,20,MA95001,李勇,男,20,CS95011,包小柏,男,18,MA95009,梦圆圆,女,18,MA95015,王君,男,18,MA

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ReadHDFSDataToHbaseMR extends Configured implements Tool{ public static void main(String[] args) throws Exception { int run = ToolRunner.run(new ReadHDFSDataToHbaseMR(), args); System.exit(run); } @Override public int run(String[] arg0) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("fs.defaultFS", "hdfs://myha01/"); conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181"); System.setProperty("HADOOP_USER_NAME", "hadoop"); FileSystem fs = FileSystem.get(conf);// conf.addResource("config/core-site.xml");// conf.addResource("config/hdfs-site.xml"); Job job = Job.getInstance(conf); job.setJarByClass(ReadHDFSDataToHbaseMR.class); job.setMapperClass(HDFSToHbaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job,null,null,null,null,false); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Put.class); Path inputPath = new Path("/student/input/"); Path outputPath = new Path("/student/output/"); if(fs.exists(outputPath)) { fs.delete(outputPath,true); } FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isDone = job.waitForCompletion(true); return isDone ? 0 : 1; } public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } } /** * 95015,王君,男,18,MA * */ public static class HDFSToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { String[] split = key.toString().split(","); Put put = new Put(split[0].getBytes()); put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes()); put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes()); put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes()); put.addColumn("info".getBytes(), "department".getBytes(), split[4].getBytes()); context.write(NullWritable.get(), put); } } }
import java.io.IOException;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ReadHbaseDataToHDFS extends Configured implements Tool{ public static void main(String[] args) throws Exception { int run = ToolRunner.run(new ReadHbaseDataToHDFS(), args); System.exit(run); } @Override public int run(String[] arg0) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("fs.defaultFS", "hdfs://myha01/"); conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181"); System.setProperty("HADOOP_USER_NAME", "hadoop"); FileSystem fs = FileSystem.get(conf);// conf.addResource("config/core-site.xml");// conf.addResource("config/hdfs-site.xml"); Job job = Job.getInstance(conf); job.setJarByClass(ReadHbaseDataToHDFS.class); // 取对业务有用的数据 info,age Scan scan = new Scan(); scan.addColumn("info".getBytes(), "age".getBytes()); TableMapReduceUtil.initTableMapperJob( "student".getBytes(), // 指定表名 scan, // 指定扫描数据的条件 HbaseToHDFSMapper.class, // 指定mapper class Text.class, // outputKeyClass mapper阶段的输出的key的类型 IntWritable.class, // outputValueClass mapper阶段的输出的value的类型 job, // job对象 false ); job.setReducerClass(HbaseToHDFSReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path outputPath = new Path("/student/avg/"); if(fs.exists(outputPath)) { fs.delete(outputPath,true); } FileOutputFormat.setOutputPath(job, outputPath); boolean isDone = job.waitForCompletion(true); return isDone ? 0 : 1; } public static class HbaseToHDFSMapper extends TableMapper<Text, IntWritable>{ Text outKey = new Text("age"); IntWritable outValue = new IntWritable(); // key是hbase中的行键 // value是hbase中的所行键的所有数据 @Override protected void map(ImmutableBytesWritable key, Result value,Context context) throws IOException, InterruptedException { boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes()); if(isContainsColumn) { List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes()); System.out.println("listCells:\t"+listCells); Cell cell = listCells.get(0); System.out.println("cells:\t"+cell); byte[] cloneValue = CellUtil.cloneValue(cell); String ageValue = Bytes.toString(cloneValue); outValue.set(Integer.parseInt(ageValue)); context.write(outKey,outValue); } } } public static class HbaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{ DoubleWritable outValue = new DoubleWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; int sum = 0; for(IntWritable value : values) { count++; sum += value.get(); } double avgAge = sum * 1.0 / count; outValue.set(avgAge); context.write(key, outValue); } } }] Données d'écriture dans ce fichier sur HDFS dans le bloc de données HBASE

Mapreduce effectue le code comme suit

Mapreduce lit les données de HBASE pour calculer l'âge moyen et stocké aller à HDFS

Sujets

Catégories