Skip to content

Conversation

@zhuguangbin
Copy link

In some scenario, LzoThriftBlock file may be corrupt. For example, in our company, we consume kafka data and sink to HDFS as LzoThriftBlock format using a flink streaming job. If the job crash, the writing file may be corrupt. The mapreduce job reading the corrupt file will fail .

Error logs are as follows:

org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: Premature EOF from inputStream
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:201)
at com.twitter.elephantbird.mapreduce.io.BinaryBlockReader.parseNextBlock(BinaryBlockReader.java:145)
at com.twitter.elephantbird.mapreduce.io.BinaryBlockReader.setupNewBlockIfNeeded(BinaryBlockReader.java:169)
at com.twitter.elephantbird.mapreduce.io.BinaryBlockReader.readNextProtoBytes(BinaryBlockReader.java:87)
at com.twitter.elephantbird.mapreduce.io.BinaryBlockReader.readNext(BinaryBlockReader.java:74)
at com.twitter.elephantbird.mapreduce.input.LzoBinaryBlockRecordReader.nextKeyValue(LzoBinaryBlockRecordReader.java:138)
at com.twitter.elephantbird.pig.load.LzoBaseLoadFunc.getNextBinaryValue(LzoBaseLoadFunc.java:108)
at com.twitter.elephantbird.pig.load.ThriftPigLoader.getNext(ThriftPigLoader.java:48)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.nextKeyValue(PigRecordReader.java:211)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

The reason is the last block of the split is incomplete because of suddenly quit of the writing steam. I think this could be more tolerant,although the last block is corrupt, the preceding blocks of the split is OK. All we should do is skipping the corrupt block.

@isnotinvain
Copy link
Contributor

I think we would want this behind some config settings. Maybe something similar to

static class InputErrorTracker implements Closeable {

that allows for setting a max number of tolerated errors.

Additionally, I think we'd want to be sure that catching this sort of exception is recoverable. How do we know the rest of the file isn't corrupt? If a proto blob is corrupt, we need to be sure we know where the next good one starts.

I would also point out that you may want to write your files to a temporary location and then "commit" them by closing them and then moving them to a final destination. That way if something crashes, the whole file gets thrown out and you can rebuild it from scratch.

@CLAassistant
Copy link

CLAassistant commented Jul 18, 2019

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


zhuguangbin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants