Spark bulkload Added a key not lexically larger than previous

当我以文件的前几行数据行测试时没问题,数据量大了以后就出现:
 Added a key not lexically larger than previous. Current cell = 22169081451/data:ALT/1559803587445/Put/vlen=2/seqid=0, lastCell = 22169081451/data:REF/1559803587445/Put/vlen=1/seqid=0
 
我的核心代码是:

SparkConf sparkConf = new SparkConf().setAppName("Spark2HBasebulk")
.set("spark.scheduler.mode", "FAIR")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator","sparkStore.MyKryoRegister");
   
   JavaSparkContext sc = new JavaSparkContext(sparkConf);
   
   Configuration hbaseConf = HBaseConfiguration.create();
       hbaseConf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 1024);
//       hbaseConf.setInt("hbase.bulkload.retries.number", 0);
       hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
       Connection conn = ConnectionFactory.createConnection(hbaseConf);
       Admin admin = conn.getAdmin();
       TableName tableName = TableName.valueOf(tablename);
        
        Job job = Job.getInstance();
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
 
 
HRegionLocator regionLocator = new HRegionLocator(tableName, (ClusterConnection) conn);
Table realTable = ((ClusterConnection) conn).getTable(tableName);
 HFileOutputFormat2.configureIncrementalLoad(job, realTable, regionLocator);

final Broadcast<String> datas = sc.broadcast(datasb.toString());
System.out.println(datasb.toString());
final Broadcast<String> infos = sc.broadcast(infosb.toString());
System.out.println(infosb.toString());
final Broadcast<String> samples = sc.broadcast(samsb.toString());
 
JavaRDD<String> javaRDD = textfile.filter(new Function<String,Boolean>(){
private static final long serialVersionUID = 3911747218220824678L;
public Boolean call(String line) throws Exception {
// TODO Auto-generated method stub
return !line.startsWith("#");
}
});
        JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRDD =
                javaRDD.mapToPair(new PairFunction<String, ImmutableBytesWritable, List<Tuple2<ImmutableBytesWritable, KeyValue>>>() {
private static final long serialVersionUID = 2063250436760623971L;
public Tuple2<ImmutableBytesWritable, List<Tuple2<ImmutableBytesWritable, KeyValue>>> call(String line) throws Exception {
                List<Tuple2<ImmutableBytesWritable, KeyValue>> tps = new ArrayList<Tuple2<ImmutableBytesWritable, KeyValue>>();
                
                ArrayList<Tuple2<String, Integer>> dataList  = new ArrayList<Tuple2<String, Integer>>();
                String[] datacol = datas.value().split(",");
                for(int i=0;i<datacol.length;i++){
                dataList.add(new Tuple2<String, Integer>(datacol[i],i));
                }
                ArrayList<Tuple2<String, Integer>> infoList = new ArrayList<Tuple2<String, Integer>>();
                String[] infocol = infos.value().split(",");
                for(int i=0;i<infocol.length;i++){
                infoList.add(new Tuple2<String, Integer>(infocol[i],i));
                }
                ArrayList<Tuple2<String, Integer>> sampleList  = new ArrayList<Tuple2<String, Integer>>();
                String[] samcol = samples.value().split(",");
                for(int i=0;i<samcol.length;i++){
                sampleList.add(new Tuple2<String, Integer>(samcol[i],i));
                }
                
                //生成rowkey
                String[] row = line.split("\t");
                String rowkey = row[0]+row[1]+docId;
                ImmutableBytesWritable writable = new ImmutableBytesWritable(Bytes.toBytes(rowkey));
 
                //取值
                Map<String,String> infomap = new HashMap<String,String>();
                // sort columns。这里需要对列进行排序,
                Collections.sort(dataList,new Comparator<Tuple2<String, Integer>>() {
                public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                // TODO Auto-generated method stub
return o1._1.compareTo(o2._1);
}
                });
                Collections.sort(infoList,new Comparator<Tuple2<String, Integer>>() {
                public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                // TODO Auto-generated method stub
                return o1._1.compareTo(o2._1);
                }
                });
                Collections.sort(sampleList,new Comparator<Tuple2<String, Integer>>() {
                public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                // TODO Auto-generated method stub
                return o1._1.compareTo(o2._1);
                }
                });

                String[] info = row[7].split(";");
        for(String detail:info){
    String[] p = detail.split("="); 
    if(p!=null && p.length>=2)
    infomap.put(p[0], p[1]);
    }
        System.out.println(dataList.toString());
    for(int i =0;i<dataList.size();i++){
    String value = row[dataList.get(i)._2];
    if(value!=null && !value.equals(".")){
    KeyValue kv = new KeyValue(Bytes.toBytes(rowkey),
                            Bytes.toBytes(COLFAMILIES[0]),
                            Bytes.toBytes(dataList.get(i)._1), Bytes.toBytes(value));
                    tps.add(new Tuple2<ImmutableBytesWritable, KeyValue>(writable, kv));
    }
    }
    for(int i =0;i<infoList.size();i++){
    String value = infomap.get(infoList.get(i)._1);
    if(value!=null && !value.equals(".")){
            KeyValue kv = new KeyValue(Bytes.toBytes(rowkey),
                            Bytes.toBytes(COLFAMILIES[1]),
                            Bytes.toBytes(infoList.get(i)._1), Bytes.toBytes(value));
                    tps.add(new Tuple2<ImmutableBytesWritable, KeyValue>(writable, kv));
    }
    }
    for(int i =0;i<sampleList.size();i++){
    String value = row[sampleList.get(i)._2+8];
    if(value!=null && !value.equals(".")){
            KeyValue kv = new KeyValue(Bytes.toBytes(rowkey),
                            Bytes.toBytes(COLFAMILIES[2]),
                            Bytes.toBytes(sampleList.get(i)._1), Bytes.toBytes(value));
                    tps.add(new Tuple2<ImmutableBytesWritable, KeyValue>(writable, kv));
    }
    }
 
                return new Tuple2<ImmutableBytesWritable, List<Tuple2<ImmutableBytesWritable, KeyValue>>>(writable, tps);
            }
       // 这里一定要按照rowkey进行排序,这个效率很低,目前没有找到优化的替代方案
        }).sortByKey().flatMapToPair(new PairFlatMapFunction<Tuple2<ImmutableBytesWritable, List<Tuple2<ImmutableBytesWritable, KeyValue>>>,
                ImmutableBytesWritable, KeyValue>() {
private static final long serialVersionUID = 1254388259270824538L;
public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(Tuple2<ImmutableBytesWritable,
                    List<Tuple2<ImmutableBytesWritable, KeyValue>>> tuple2s) throws Exception {
 
                return tuple2s._2().iterator();
            }
        });
 
        // 创建HDFS的临时HFile文件目录
        javaPairRDD.saveAsNewAPIHadoopFile(temp, ImmutableBytesWritable.class,
                KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());
 
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf);
        loader.doBulkLoad(new Path(temp), admin, realTable, regionLocator);
 
请各位大神指点迷津
已邀请:

sushu

赞同来自:

问题解决了,大数据量大的时候,会出现同一行数据多次出现的情况,因此需要通过reduceByKey将重复的元组去除
即更改为:

JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRDD =
                javaRDD.mapToPair(new PairFunction<String, ImmutableBytesWritable, List<Tuple2<ImmutableBytesWritable, KeyValue>>>() {
private static final long serialVersionUID = 2063250436760623971L;
public Tuple2<ImmutableBytesWritable, List<Tuple2<ImmutableBytesWritable, KeyValue>>> call(String line) throws Exception {
                List<Tuple2<ImmutableBytesWritable, KeyValue>> tps = new ArrayList<Tuple2<ImmutableBytesWritable, KeyValue>>();
                
                ArrayList<Tuple2<String, Integer>> dataList  = new ArrayList<Tuple2<String, Integer>>();
                String[] datacol = datas.value().split(",");
                for(int i=0;i<datacol.length;i++){
                dataList.add(new Tuple2<String, Integer>(datacol[i],i));
                }
                ArrayList<Tuple2<String, Integer>> infoList = new ArrayList<Tuple2<String, Integer>>();
                String[] infocol = infos.value().split(",");
                for(int i=0;i<infocol.length;i++){
                infoList.add(new Tuple2<String, Integer>(infocol[i],i));
                }
                ArrayList<Tuple2<String, Integer>> sampleList  = new ArrayList<Tuple2<String, Integer>>();
                String[] samcol = samples.value().split(",");
                for(int i=0;i<samcol.length;i++){
                sampleList.add(new Tuple2<String, Integer>(samcol[i],i));
                }
                
                //生成rowkey
                String[] row = line.split("\t");
                String rowkey = row[0]+row[1]+docId;
                ImmutableBytesWritable writable = new ImmutableBytesWritable(Bytes.toBytes(rowkey));
 
                //取值
                Map<String,String> infomap = new HashMap<String,String>();
                // sort columns。这里需要对列进行排序,不然会报错
                Collections.sort(dataList,new Comparator<Tuple2<String, Integer>>() {
                public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                // TODO Auto-generated method stub
return o1._1.compareTo(o2._1);
}
                });
                
                Collections.sort(infoList,new Comparator<Tuple2<String, Integer>>() {
                public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                // TODO Auto-generated method stub
                return o1._1.compareTo(o2._1);
                }
                });
                Collections.sort(sampleList,new Comparator<Tuple2<String, Integer>>() {
                public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                // TODO Auto-generated method stub
                return o1._1.compareTo(o2._1);
                }
                });
                String[] info = row[7].split(";");
        for(String detail:info){
    String[] p = detail.split("="); 
    if(p!=null && p.length>=2)
    infomap.put(p[0], p[1]);
    }
    for(int i =0;i<dataList.size();i++){
    String value = row[dataList.get(i)._2];
    if(value!=null && !value.equals(".")){
    KeyValue kv = new KeyValue(Bytes.toBytes(rowkey),
                            Bytes.toBytes(COLFAMILIES[0]),
                            Bytes.toBytes(dataList.get(i)._1), Bytes.toBytes(value));
                    tps.add(new Tuple2<ImmutableBytesWritable, KeyValue>(writable, kv));
    }
    }
    for(int i =0;i<infoList.size();i++){
    String value = infomap.get(infoList.get(i)._1);
    if(value!=null && !value.equals(".")){
            KeyValue kv = new KeyValue(Bytes.toBytes(rowkey),
                            Bytes.toBytes(COLFAMILIES[1]),
                            Bytes.toBytes(infoList.get(i)._1), Bytes.toBytes(value));
                    tps.add(new Tuple2<ImmutableBytesWritable, KeyValue>(writable, kv));
    }
    }
    for(int i =0;i<sampleList.size();i++){
    String value = row[sampleList.get(i)._2+8];
    if(value!=null && !value.equals(".")){
            KeyValue kv = new KeyValue(Bytes.toBytes(rowkey),
                            Bytes.toBytes(COLFAMILIES[2]),
                            Bytes.toBytes(sampleList.get(i)._1), Bytes.toBytes(value));
                    tps.add(new Tuple2<ImmutableBytesWritable, KeyValue>(writable, kv));
    }
    }
    
                return new Tuple2<ImmutableBytesWritable, List<Tuple2<ImmutableBytesWritable, KeyValue>>>(writable, tps);
            }
       // 这里一定要按照rowkey进行排序,这个效率很低,目前没有找到优化的替代方案
        }).reduceByKey(new Function2<List<Tuple2<ImmutableBytesWritable, KeyValue>>,List<Tuple2<ImmutableBytesWritable, KeyValue>>,
        List<Tuple2<ImmutableBytesWritable, KeyValue>>>(){
private static final long serialVersionUID = -134763418591688822L;
public List<Tuple2<ImmutableBytesWritable, KeyValue>> call(
List<Tuple2<ImmutableBytesWritable, KeyValue>> kv1,
List<Tuple2<ImmutableBytesWritable, KeyValue>> kv2) throws Exception {
// TODO Auto-generated method stub
return kv1;
}
       
        }).
sortByKey().flatMapToPair(new PairFlatMapFunction<Tuple2<ImmutableBytesWritable, List<Tuple2<ImmutableBytesWritable, KeyValue>>>,
                ImmutableBytesWritable, KeyValue>() {
private static final long serialVersionUID = 1254388259270824538L;
public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(Tuple2<ImmutableBytesWritable,
                    List<Tuple2<ImmutableBytesWritable, KeyValue>>> tuple2s) throws Exception {
                return tuple2s._2().iterator();
            }
        });
 

要回复问题请先登录注册


中国HBase技术社区微信公众号:
hbasegroup

欢迎加入HBase生态+Spark社区钉钉大群