• 集成
    • 搭建 Hadoop 环境
    • 配置连接环境
    • 编写 MapReduce
    • 示例

    集成

    搭建 Hadoop 环境

    我们支持 hadoop 1.x 和 hadoop 2.x。先安装配置好 Hadoop。

    配置连接环境

    与 MapReduce 对接,需要准备 hadoop-connector.jar 和 sequoiadb.jar,这两个 jar 可以在 SequoiaDB 安装目录下面的 hadoop 目录(默认为/opt/sequoiadb/hadoop)中找到。

    运行

    1. $ hadoop classpath

    查看 hadoop 的 classpath。在 classpath 中选择一个目录,把 hadoop-connector.jar 和 sequoiadb.jar 放在目录里面,重启 hadoop 集群。

    编写 MapReduce

    hadoop-connector.jar 中一些重要的类:

    SequoiadbInputFormat:读取SequoiaDB的数据。

    SequoiadbOutputFormat:向SequoiaDB写入数据。

    BSONWritable:BSONObject 的包装类,实现了 WritableComparable 接口。用于序列化 BSONObject 对象。

    SequoiaDB 和 MapReduce 的配置:

    sequoiadb-hadoop.xml 是配置文件,放在你编写的 MapReduce 工程的源码根目录下面。

    sequoiadb.input.url:指定作为输入源 SequoiaDB 的 URL 路径,格式为:hostname1:port1,hostname2:port2。

    sequoiadb.input.user:指定输入源 SequoiaDB 的用户名,默认为 null。

    sequoiadb.input.passwd:指定输入源 SequoiaDB 用户密码,默认为 null。

    sequoiadb.in.collectionspace:指定作为输入源 SequoiaDB 集合空间。

    sequoiadb.in.collection:指定作为输入源 SequoiaDB 集合。

    sequoiadb.query.json:指定输入源 SequoiaDB 的查询条件,使用 json 结构,默认为 null。

    sequoiadb.selector.json:指定输入源 SequoiaDB 的字段筛选,使用 json 结构,默认为 null。

    sequoiadb.preferedinstance:指定从输入源 SequoiaDB 中获取数据时,连接哪个数据节点,默认为 anyone,可填值:[slave/master/anyone/node(1-7)]。

    sequoiadb.output.url:指定作为输出源 SequoiaDB 的 URL 路径。

    sequoiadb.output.user:指定输出源 SequoiaDB 用户名,默认为 null。

    sequoiadb.output.passwd:指定输出源 SequoiaDB 用户密码,默认为 null。

    sequoiadb.out.collectionspace:指定作为输出源 SequoiaDB 的集合空间。

    sequoiadb.out.collection:指定作为输出源 SequoiaDB 的集合。

    sequoiadb.out.bulknum:指定每次向输出源 SequoiaDB 写入的记录条数,对写入性能进行优化。

    示例

    (1)下面是读取 HDFS 文件,处理后写入到 SequoiaDB 中去:

    1. public class HdfsSequoiadbMR {
    2. static class MobileMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    3. private static final IntWritable ONE=new IntWritable(1);
    4. @Override
    5. protected void map(LongWritable key, Text value, Context context)
    6. throws IOException, InterruptedException {
    7. String valueStr=value.toString();
    8.  
    9. String mobile_prefix=valueStr.split(",")[3].substring(0,3);
    10. context.write(new Text(mobile_prefix), ONE);
    11. }
    12.  
    13. }
    14.  
    15. static class MobileReducer extends Reducer<Text, IntWritable, NullWritable, BSONWritable>{
    16.  
    17. @Override
    18. protected void reduce(Text key, Iterable<IntWritable> values,Context context)
    19. throws IOException, InterruptedException {
    20. Iterator<IntWritable> iterator=values.iterator();
    21. long sum=0;
    22. while(iterator.hasNext()){
    23. sum+=iterator.next().get();
    24. }
    25. BSONObject bson=new BasicBSONObject();
    26. bson.put("prefix", key.toString());
    27. bson.put("count", sum);
    28. context.write(null,new BSONWritable(bson));
    29. }
    30.  
    31. }
    32.  
    33. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    34. if(args.length < 1){
    35. System.out.print("please set input path ");
    36. System.exit(1);
    37. }
    38. Configuration conf=new Configuration();
    39. conf.addResource("sequoiadb-hadoop.xml"); //加载配置文件
    40. Job job=Job.getInstance(conf);
    41. job.setJarByClass(HdfsSequoiadbMR.class);
    42. job.setJobName("HdfsSequoiadbMR");
    43. job.setInputFormatClass(TextInputFormat.class);
    44. job.setOutputFormatClass(SequoiadbOutputFormat.class); //reduce 输出写入到 SequoiaDB 中
    45. TextInputFormat.setInputPaths(job, new Path(args[0]));
    46.  
    47. job.setMapperClass(MobileMapper.class);
    48. job.setReducerClass(MobileReducer.class);
    49.  
    50. job.setMapOutputKeyClass(Text.class);
    51. job.setMapOutputValueClass(IntWritable.class);
    52.  
    53. job.setOutputKeyClass(NullWritable.class);
    54. job.setOutputValueClass(BSONWritable.class);
    55.  
    56. job.waitForCompletion(true);
    57. }
    58. }

    (2) 读取 SequoiaDB 中数据处理后写入到 HDFS 中。

    1. public class SequoiadbHdfsMR {
    2. static class ProvinceMapper extends Mapper<Object, BSONWritable,IntWritable,IntWritable>{
    3. private static final IntWritable ONE=new IntWritable(1);
    4. @Override
    5. protected void map(Object key, BSONWritable value, Context context)
    6. throws IOException, InterruptedException {
    7. BSONObject obj = value.getBson();
    8. int province=(Integer) obj.get("province_code");
    9. context.write(new IntWritable(province), ONE);
    10. }
    11.  
    12. }
    13.  
    14. static class ProvinceReducer extends Reducer<IntWritable,IntWritable,IntWritable,LongWritable>{
    15.  
    16. @Override
    17. protected void reduce(IntWritable key, Iterable<IntWritable> values,
    18. Context context)
    19. throws IOException, InterruptedException {
    20. Iterator<IntWritable> iterator=values.iterator();
    21. long sum=0;
    22. while(iterator.hasNext()){
    23. sum+=iterator.next().get();
    24. }
    25. context.write(key,new LongWritable(sum));
    26. }
    27.  
    28. }
    29.  
    30.  
    31. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    32. if(args.length<1){
    33. System.out.print("please set output path ");
    34. System.exit(1);
    35. }
    36. Configuration conf=new Configuration();
    37. conf.addResource("sequoiadb-hadoop.xml");
    38. Job job=Job.getInstance(conf);
    39. job.setJarByClass(SequoiadbHdfsMR.class);
    40. job.setJobName("SequoiadbHdfsMR");
    41. job.setInputFormatClass(SequoiadbInputFormat.class);
    42. job.setOutputFormatClass(TextOutputFormat.class);
    43.  
    44.  
    45. FileOutputFormat.setOutputPath(job, new Path(args[0]+"/result"));
    46.  
    47. job.setMapperClass(ProvinceMapper.class);
    48. job.setReducerClass(ProvinceReducer.class);
    49.  
    50. job.setMapOutputKeyClass(IntWritable.class);
    51. job.setMapOutputValueClass(IntWritable.class);
    52.  
    53. job.setOutputKeyClass(IntWritable.class);
    54. job.setOutputValueClass(LongWritable.class);
    55.  
    56. job.waitForCompletion(true);
    57. }
    58. }

    配置信息:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <configuration>
    3. <property>
    4. <name>sequoiadb.input.url</name>
    5. <value>localhost:11810</value>
    6. </property>
    7. <property>
    8. <name>sequoiadb.output.url</name>
    9. <value>localhost:11810</value>
    10. </property>
    11. <property>
    12. <name>sequoiadb.in.collectionspace</name>
    13. <value>default</value>
    14. </property>
    15. <property>
    16. <name>sequoiadb.in.collect</name>
    17. <value>student</value>
    18. </property>
    19. <property>
    20. <name>sequoiadb.out.collectionspace</name>
    21. <value>default</value>
    22. </property>
    23. <property>
    24. <name>sequoiadb.out.collect</name>
    25. <value>result</value>
    26. </property>
    27. <property>
    28. <name>sequoiadb.out.bulknum</name>
    29. <value>10</value>
    30. </property>
    31. </configuration>