1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| package com.fenbi.ape.hive.serde.fileformat;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.*;
import java.io.IOException;
/** * Created by xiaoyue26 on 17/9/14. * <p> * 不接受压缩. 要修改为接受压缩的话,需要学LineRecordReader再修改. * 只支持utf-8 */ public class XmlInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = "xmlinput.start"; public static final String END_TAG_KEY = "xmlinput.end";
@Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { //new org.apache.hadoop.mapreduce.lib.input.LineRecordReader(); return new XmlRecordReader((FileSplit) inputSplit, jobConf); }
/** * XMLRecordReader class to read through a given xml document to output xml * blocks as records as specified by the start tag and end tag */ public static class XmlRecordReader implements RecordReader<LongWritable, Text> { private final byte[] startTag; private final byte[] endTag; private final long start; private final long end; private final FSDataInputStream fsin; private final DataOutputBuffer buffer = new DataOutputBuffer();
public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException { //不支持通配符,正则 startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8"); endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");
// open the file and seek to the start of the split start = split.getStart(); end = start + split.getLength(); Path file = split.getPath(); FileSystem fs = file.getFileSystem(jobConf); fsin = fs.open(split.getPath()); //先定位到文件此次的开头 fsin.seek(start); }
// 捞出 偏移量key,文本value @Override public boolean next(LongWritable key, Text value) throws IOException { if (fsin.getPos() < end) { if (readUntilMatch(startTag, false)) { try { //存一下startTag buffer.write(startTag); if (readUntilMatch(endTag, true)) { key.set(fsin.getPos()); value.set(buffer.getData(), 0, buffer.getLength()); return true; } } finally { buffer.reset(); } } } return false; }
@Override public LongWritable createKey() { return new LongWritable(); }
@Override public Text createValue() { return new Text(); }
@Override public long getPos() throws IOException { return fsin.getPos(); }
@Override public synchronized void close() throws IOException { if (fsin != null) { fsin.close(); }
}
@Override public float getProgress() throws IOException { if (start == end) { return 0.0f; } return Math.min(1.0f, (fsin.getPos() - start) / (float) (end - start)); }
/** * withinBlock=True : 遇到的byte都存入buffer,包括match数组(找endTag) * withinBlock=False : 不存入buffer(找startTag) * <p> * return True : 匹配上了 * return False : 文件结束都没匹配上 * </p> * split是按splitSize划分的. */ private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { int i = 0; while (true) { int b = fsin.read(); // end of file: if (b == -1) return false; // save to buffer: if (withinBlock) buffer.write(b);
// check if we're matching: if (b == match[i]) { i++; // 匹配完毕: if (i >= match.length) return true; } else { // 回溯,从头再来 i = 0; } // see if we've passed the stop point: /* * 前面检查了有没超过文件尾,这里检查有没有超过split尾 * 情形1: 如果在找startTag(withinBlock=false) * , 而且一个都没匹配上, 而且已经到达split尾, [start,end) * 返回false. * (万里缉凶): * 情形2: 如果在找endTag(withinBlock=True) * ,即使已经到split末尾,也允许跨split的查找. * 尽量返回true. * 情形3: 如果i!=0,即使已经到split末尾,也允许跨split的查找,以保证startTag或者endTag不被分割. * 尽量返回true. * **/
if (!withinBlock && i == 0 && fsin.getPos() >= end) { return false; } } } } }
|