HBase2.2.x 学习笔记

HBase 配置

hbase-site.xml

<configuration>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
  <property>
    <name>hbase.tmp.dir</name>
    <value>./tmp</value>
  </property>
  <property>
    <name>hbase.unsafe.stream.capability.enforce</name>
    <value>false</value>
  </property>
  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://hadoop-1:9000/hbase</value>
  </property>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>hadoop-1</value>
  </property>
</configuration>

Java 客户端 HBase操作

添加依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.2.3</version>
</dependency>

获取连接

private static Connection getConn() throws IOException {
    Configuration conf = HBaseConfiguration.create();
    conf.set("hadoop.rootdir", "hdfs://hadoop-1:9000/hbase");
    conf.set("hbase.zookeeper.quorum", "hadoop-1");
    return ConnectionFactory.createConnection(conf);
}

添加表

private static void createTable(String tableName, List<String> columnNames) throws IOException {
        Connection conn = getConn();
        Admin admin = conn.getAdmin();
        TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));

        List<ColumnFamilyDescriptor> families = new ArrayList<>();
        for (String columnName : columnNames) {
            families.add(ColumnFamilyDescriptorBuilder.of(columnName));
        }
        tableDescriptorBuilder.setColumnFamilies(families);
        admin.createTable(tableDescriptorBuilder.build());
        close(conn);
    }

比如我们调用:

createTable("emp", Arrays.asList("id", "name"));

然后使用 desc 'emp'可以看到创建好的表结构

hbase(main):013:0* desc 'emp'
Table emp is ENABLED                                                                                                                                          
emp                                                                                                                                                           
COLUMN FAMILIES DESCRIPTION                                                                                                                                   
{NAME => 'id', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'fals
e', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_
MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}   

{NAME => 'name', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'fa
lse', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', I
N_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'} 

2 row(s)

QUOTAS                                                                                                                                                        
0 row(s)
Took 4.3331 seconds            

删除表

删除表之前需要表的状态是disable状态,所以需要先禁用表

private static void deleteTable(String tableName) throws IOException {
    Connection conn = getConn();
    Admin admin = conn.getAdmin();
    admin.disableTable(TableName.valueOf(tableName));
    admin.deleteTable(TableName.valueOf(tableName));
    close(conn);
}

添加、修改记录(整行、列族、列)

可以addColumn来设置多个值

private static void insert(String tableName, String rowKey, String family, String qualifier, String value) throws IOException {
    Connection conn = getConn();
    Table table = conn.getTable(TableName.valueOf(tableName));
    table.put(new Put(Bytes.toBytes(rowKey)).addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier),
            Bytes.toBytes(value)));
    close(conn);
}

删除记录(整行、列族、列)

如果指定了 family或者 qualifier则删除的是部分字段,否则是删除整行

private static void delete(String tableName, String rowKey, String family, String qualifier) throws IOException {
    Connection conn = getConn();
    Table table = conn.getTable(TableName.valueOf(tableName));
    Delete delete = new Delete(Bytes.toBytes(rowKey));

    if (StringUtils.isNotBlank(qualifier)) {
        delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
    } else if (StringUtils.isNotBlank(family)) {
        delete.addFamily(Bytes.toBytes(family));
    }
    table.delete(delete);
    close(conn);
}

获取单条数据(整行、列族、列)

private static Map<String, String> get(String tableName, String rowKey) throws IOException {
    Connection conn = getConn();
    Table table = conn.getTable(TableName.valueOf(tableName));
    Get get = new Get(Bytes.toBytes(rowKey));
    Result result = table.get(get);
    Map<String, String> map = new HashMap<>();
    for (Cell listCell : result.listCells()) {
        String family = Bytes.toString(CellUtil.cloneFamily(listCell));
        String columnName = Bytes.toString(CellUtil.cloneQualifier(listCell));
        String value = Bytes.toString(CellUtil.cloneValue(listCell));
        map.put(family + ":" + columnName, value);
    }
    close(conn);
    return map;
}

获取多条数据

public static List<Map<String, String>> scan(String tableName) throws IOException {
    Connection conn = getConn();
    Table table = conn.getTable(TableName.valueOf(tableName));
    ResultScanner scanner = table.getScanner(new Scan());
    List<Map<String, String>> list = new ArrayList<>();
    for (Result result : scanner) {
        Map<String, String> rowToMap = getRowToMap(result);
        list.add(rowToMap);
    }
    close(conn);
    return list;
}
private static Map<String, String> getRowToMap(Result result) {
    Map<String, String> map = new HashMap<>();
    for (Cell listCell : result.listCells()) {
        String family = Bytes.toString(CellUtil.cloneFamily(listCell));
        String columnName = Bytes.toString(CellUtil.cloneQualifier(listCell));
        String value = Bytes.toString(CellUtil.cloneValue(listCell));
        map.put(family + ":" + columnName, value);
    }
    return map;
}

如果查询指定范围内的数据:

new Scan().withStartRow(Bytes.toBytes(start)).withStopRow(Bytes.toBytes(end))

默认只查询最后一个版本的数据,如果需要查询多个版本的数据可以设置

new Scan().withStartRow(Bytes.toBytes(start)).withStopRow(Bytes.toBytes(end)).readVersions(2)

导入数据到HBASE,使用MapReduce统计次数

假设是这些数据:

1_song1_2016-1-11	song1	singer1	man	slow	pc
2_song2_2016-1-11	song2	singer2	woman	slow	ios
3_song3_2016-1-11	song3	singer3	man	quick	andriod
4_song4_2016-1-11	song4	singer4	woman	slow	ios
5_song5_2016-1-11	song5	singer5	man	quick	pc
6_song6_2016-1-11	song6	singer6	woman	quick	ios
7_song7_2016-1-11	song7	singer7	man	quick	andriod
8_song8_2016-1-11	song8	singer8	woman	slow	pc
9_song9_2016-1-11	song9	singer9	woman	slow	ios
10_song4_2016-1-11	song4	singer4	woman	slow	ios
11_song6_2016-1-11	song6	singer6	woman	quick	ios
12_song6_2016-1-11	song6	singer6	woman	quick	ios
13_song3_2016-1-11	song3	singer3	man	quick	andriod
14_song2_2016-1-11	song2	singer2	woman	slow	ios

导入数据

在HDFS中创建一个目录,保存原始文件

hadoop fs -mkdir -p hdfs://hadoop-1:9000/input2/music2

上传文件到hdfs中

hadoop fs -put music1.txt hdfs://hadoop-1:9000/input2/music2

在Hbase sheel 中创建表

create 'music','info'

导入tsv文件到 music表中

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:signer,info:gender,info:ryghme,info:terminal music /input2/music2/music1.txt

计算统计

Hbase Shell 执行创建存放统计信息的表

create 'namelist','details'

编写Map 和 Reduce 类

public class MusicPlayCountMapper extends TableMapper<Text, IntWritable> {

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException,
            InterruptedException {
        for (Cell listCell : value.listCells()) {
            context.write(new Text(Bytes.toString(CellUtil.cloneValue(listCell))), new IntWritable(1));
        }
    }
}
public class MusicCountReducer extends TableReducer<Text, IntWritable, Text> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
            InterruptedException {

        int rank = 0;
        for (IntWritable value : values) {
            rank += value.get();
        }
       context.write(key, new Put(Bytes.toBytes(key.toString())).addColumn(Bytes.toBytes("details"),             Bytes.toBytes("rank"), Bytes.toBytes(String.valueOf(rank))).addColumn(Bytes.toBytes("details"), Bytes.toBytes("song"), Bytes.toBytes(key.toString())))
    }
}

编写一个运行job的方法:

public static void mapReduceMusic() throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "top-music");
    job.setMapperClass(MusicPlayCountMapper.class);
    job.setReducerClass(MusicCountReducer.class);
    job.setJarByClass(HbaseDemo.class);
    job.setNumReduceTasks(1);

    TableMapReduceUtil.initTableMapperJob("music", new Scan().addColumn(
            Bytes.toBytes("info"),
            Bytes.toBytes("name")
    ), MusicPlayCountMapper.class, Text.class, IntWritable.class, job);
    TableMapReduceUtil.initTableReducerJob("music_top", MusicCountReducer.class, job);
    job.waitForCompletion(true);
}

运行之前需要做一件事,由于默认运行的时候是在本地运行job,所以需要所有的hbase的依赖jar,所以需要把hbase下的lib中的jar都加入到classpath中。

运行此方法后,可以通过 之前写的 scan 来查询 music_top表中的数据:

[{details:rank=1, details:song=song1}, {details:rank=2, details:song=song2}, {details:rank=2, details:song=song3}, {details:rank=2, details:song=song4}, {details:rank=1, details:song=song5}, {details:rank=3, details:song=song6}, {details:rank=1, details:song=song7}, {details:rank=1, details:song=song8}, {details:rank=1, details:song=song9}]

Leave a Comment

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据