MapReduce操作HBase错误一例

运行HBase时常会遇到个错误,我就有这样的经历。

ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times

检查日志:org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 42, server = 41)

如果是这个错误,说明RPC协议不一致所造成的,解决方法:将hbase/lib目录下的hadoop-core的jar文件删除,将hadoop目录下的hadoop-0.20.2-core.jar拷贝到hbase/lib下面,然后重新启动hbase即可。第二种错误是:没有启动hadoop,先启用hadoop,再启用hbase。

在Eclipse开发中,需要加入hadoop所有的jar包以及HBase二个jar包(hbase,zooKooper)。

HBase基础可见帖子:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499112.html

建表,通过HBaseAdmin类中的create静态方法来创建表。

HTable类是操作表,例如,静态方法put可以插入数据,该类初始化时可以传递一个行键,静态方法getScanner()可以获得某一列上的所有数据,返回Result类,Result类中有个静态方法getFamilyMap()可以获得以列名为key,值为value,这刚好与hadoop中map结果是一样的。

  1. package test;  
  2. import java.io.IOException;  
  3. import java.util.Map;  
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.hbase.HBaseConfiguration;  
  6. import org.apache.hadoop.hbase.HColumnDescriptor;  
  7. import org.apache.hadoop.hbase.HTableDescriptor;  
  8. import org.apache.hadoop.hbase.client.HBaseAdmin;  
  9. import org.apache.hadoop.hbase.client.HTable;  
  10. import org.apache.hadoop.hbase.client.Put;  
  11. import org.apache.hadoop.hbase.client.Result;  
  12.  
  13. public class Htable {  
  14.  
  15.     /**  
  16.      * @param args  
  17.      */ 
  18.     public static void main(String[] args) throws IOException {  
  19.         // TODO Auto-generated method stub  
  20.         Configuration hbaseConf = HBaseConfiguration.create();  
  21.         HBaseAdmin admin = new HBaseAdmin(hbaseConf);  
  22.         HTableDescriptor htableDescriptor = new HTableDescriptor("table" 
  23.                 .getBytes());  //set the name of table  
  24.         htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters  
  25.         admin.createTable(htableDescriptor); //create a table   
  26.         HTable table = new HTable(hbaseConf, "table"); //get instance of table.  
  27.         for (int i = 0; i < 3; i++) {   //for is number of rows  
  28.             Put putRow = new Put(("row" + i).getBytes()); //the ith row  
  29.             putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1" 
  30.                     .getBytes());  //set the name of column and value.  
  31.             putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2" 
  32.                     .getBytes());  
  33.             putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3" 
  34.                     .getBytes());  
  35.             table.put(putRow);  
  36.         }  
  37.         for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters   
  38.             for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result  
  39.                 String column = new String(entry.getKey());  
  40.                 String value = new String(entry.getValue());  
  41.                 System.out.println(column+","+value);  
  42.             }  
  43.         }  
  44.         admin.disableTable("table".getBytes()); //disable the table  
  45.         admin.deleteTable("table".getBytes());  //drop the tbale  
  46.     }  

以上代码不难看懂。

下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。

现在有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。

  1. package test;  
  2. import java.io.IOException;  
  3. import java.util.Map;  
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.hbase.HBaseConfiguration;  
  6. import org.apache.hadoop.hbase.HColumnDescriptor;  
  7. import org.apache.hadoop.hbase.HTableDescriptor;  
  8. import org.apache.hadoop.hbase.client.HBaseAdmin;  
  9. import org.apache.hadoop.hbase.client.HTable;  
  10. import org.apache.hadoop.hbase.client.Put;  
  11. import org.apache.hadoop.hbase.client.Result;  
  12.  
  13. public class Htable {  
  14.  
  15.     /**  
  16.      * @param args  
  17.      */ 
  18.     public static void main(String[] args) throws IOException {  
  19.         // TODO Auto-generated method stub  
  20.         Configuration hbaseConf = HBaseConfiguration.create();  
  21.         HBaseAdmin admin = new HBaseAdmin(hbaseConf);  
  22.         HTableDescriptor htableDescriptor = new HTableDescriptor("table" 
  23.                 .getBytes());  //set the name of table  
  24.         htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters  
  25.         admin.createTable(htableDescriptor); //create a table   
  26.         HTable table = new HTable(hbaseConf, "table"); //get instance of table.  
  27.         for (int i = 0; i < 3; i++) {   //for is number of rows  
  28.             Put putRow = new Put(("row" + i).getBytes()); //the ith row  
  29.             putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1" 
  30.                     .getBytes());  //set the name of column and value.  
  31.             putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2" 
  32.                     .getBytes());  
  33.             putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3" 
  34.                     .getBytes());  
  35.             table.put(putRow);  
  36.         }  
  37.         for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters   
  38.             for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result  
  39.                 String column = new String(entry.getKey());  
  40.                 String value = new String(entry.getValue());  
  41.                 System.out.println(column+","+value);  
  42.             }  
  43.         }  
  44.         admin.disableTable("table".getBytes()); //disable the table  
  45.         admin.deleteTable("table".getBytes());  //drop the tbale  
  46.     }  

Reduce类,主要是将键值传到HBase表中

  1. package test;  
  2.  
  3. import java.io.IOException;  
  4. import org.apache.hadoop.hbase.client.Put;  
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  6. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
  7. import org.apache.hadoop.io.Text;  
  8.  
  9. public class ReducerClass extends TableReducer<Text,Text,ImmutableBytesWritable>{  
  10.     public void reduce(Text key,Iterable<Text> values,Context context){  
  11.         String k = key.toString();  
  12.         StringBuffer str=null;  
  13.         for(Text value: values){  
  14.             str.append(value.toString());  
  15.         }  
  16.         String v = new String(str);   
  17.         Put putrow = new Put(k.getBytes());  
  18.         putrow.add("fam1".getBytes(), "name".getBytes(), v.getBytes());       
  19.     }  

由上面可知ReducerClass继承TableReduce,在hadoop里面ReducerClass继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable。

Map,Reduce,以及Job的配置分离,比较清晰,mahout也是采用这种构架。

  1. package test;  
  2.  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.conf.Configured;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.hbase.HBaseConfiguration;  
  7. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  12. import org.apache.hadoop.util.Tool;  
  13.  
  14. public class Driver extends Configured implements Tool{  
  15.  
  16.     @Override 
  17.     public static void run(String[] arg0) throws Exception {  
  18.         // TODO Auto-generated method stub  
  19.         Configuration conf = HBaseConfiguration.create();  
  20.         conf.set("hbase.zookeeper.quorum.""localhost");    
  21.           
  22.         Job job = new Job(conf,"Hbase");  
  23.         job.setJarByClass(TxtHbase.class);  
  24.           
  25.         Path in = new Path(arg0[0]);  
  26.           
  27.         job.setInputFormatClass(TextInputFormat.class);  
  28.         FileInputFormat.addInputPath(job, in);  
  29.           
  30.         job.setMapperClass(MapperClass.class);  
  31.         job.setMapOutputKeyClass(Text.class);  
  32.         job.setMapOutputValueClass(Text.class);  
  33.           
  34.         TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class, job);  
  35.           
  36.        job.waitForCompletion(true);  
  37.     }  
  38.       

Driver中job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob(“tab1”, THReducer.class, job); 来执行reduce类。

主函数

  1. package test;  
  2.  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.util.ToolRunner;  
  5.  
  6. public class TxtHbase {  
  7.     public static void main(String [] args) throws Exception{        Driver.run(new Configuration(),new THDriver(),args);     } } 

读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了。

  1. package test;  
  2.  
  3. import java.io.IOException;  
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.hbase.client.Result;  
  6. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  7. import org.apache.hadoop.hbase.mapred.TableMap;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapred.MapReduceBase;  
  10. import org.apache.hadoop.mapred.OutputCollector;  
  11. import org.apache.hadoop.mapred.Reporter;  
  12.  
  13. public class MapperClass extends MapReduceBase implements 
  14.         TableMap<Text, Text> {  
  15.     static final String NAME = "GetDataFromHbaseTest";  
  16.     private Configuration conf;  
  17.  
  18.     public void map(ImmutableBytesWritable row, Result values,  
  19.             OutputCollector<Text, Text> output, Reporter reporter)  
  20.             throws IOException {  
  21.         StringBuilder sb = new StringBuilder();  
  22.         for (Entry<byte[], byte[]> value : values.getFamilyMap(  
  23.                 "fam1".getBytes()).entrySet()) {  
  24.             String cell = value.getValue().toString();  
  25.             if (cell != null) {  
  26.                 sb.append(new String(value.getKey())).append(new String(cell));  
  27.             }  
  28.         }  
  29.         output.collect(new Text(row.get()), new Text(sb.toString()));  
  30.     } 

要实现这个方法 initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<? extends org.apache.hadoop.io.WritableComparable> outputKeyClass, Class<? extends org.apache.hadoop.io.Writable> outputValueClass, org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)。

  1. package test;  
  2.  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.conf.Configured;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.hbase.HBaseConfiguration;  
  7. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  12. import org.apache.hadoop.util.Tool;  
  13.  
  14. public class Driver extends Configured implements Tool{  
  15.  
  16.     @Override 
  17.     public static void run(String[] arg0) throws Exception {  
  18.         // TODO Auto-generated method stub  
  19.         Configuration conf = HBaseConfiguration.create();  
  20.         conf.set("hbase.zookeeper.quorum.""localhost");    
  21.         Job job = new Job(conf,"Hbase");  
  22.         job.setJarByClass(TxtHbase.class);  
  23.         job.setInputFormatClass(TextInputFormat.class);  
  24.         job.setMapOutputKeyClass(Text.class);  
  25.         job.setMapOutputValueClass(Text.class);  
  26.         TableMapReduceUtilinitTableMapperJob("table", args0[0],MapperClass.class, job);         job.waitForCompletion(true); } } 

主函数

  1. package test;  
  2.  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.util.ToolRunner;  
  5.  
  6. public class TxtHbase {  
  7.     public static void main(String [] args) throws Exception{  
  8.  
  9.         Driver.run(new Configuration(),new THDriver(),args);  
  10.  
  11.     }   
  12. }  
  13. -------------------------------------------------------------------------------- 

原文链接:http://www.cnblogs.com/liqizhou/archive/2012/05/17/2504279.html

【编辑推荐】

  1. 数据库迁移之何去何从
  2. SQL Server数据库迁移偏方
  3. SQL Server数据库恢复案例分享
  4. SQL Server数据库最小宕机迁移方案
  5. 给你大型数据库迁移的五大建议  
     

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/308471.html<

(0)
运维的头像运维
上一篇2025-05-27 14:37
下一篇 2025-05-27 14:39

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注