本篇是对记录一次Sqoop从MySQL导入数据到Hive问题的排查经过的补充。
Sqoop命令通过bin下面的脚本调用,调用如下:
1
| exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"
|
org.apache.sqoop.Sqoop
是Sqoop的入口类,在此主要是解析参数及初始化工具类,然后通过org.apache.hadoop.util.ToolRunner
类调用对应的工具完成操作。Sqoop的Import操作对应的是org.apache.sqoop.tool.ImportTool
类。
在ImportTool类的return代码前增加以下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| int numMappers = options.getNumMappers();
String hDbName = options.getHCatDatabaseName(); String hTableName = options.getHCatTableName(); String hPartKeys = options.getHCatalogPartitionKeys(); String hPartVals = options.getHCatalogPartitionValues();
if(isStringNotEmpty(hDbName) && isStringNotEmpty(hTableName) && isStringNotEmpty(hPartKeys) && isStringNotEmpty(hPartVals)) { String[] partKeys = hPartKeys.split(","); String[] partVals = hPartVals.split(",");
String partPathStr = ""; if(partKeys.length > 0 && partVals.length == partKeys.length) { for(int i = 0; i < partKeys.length; i++) { partPathStr += partKeys[i] + "=" + partVals[i] + "/"; } }
String targetDir = "/user/hive/warehouse/" + hDbName + ".db/" + hTableName + "/" + partPathStr; targetDir = targetDir.toLowerCase(); LOG.info("---------targetDir=" + targetDir);
try { FileSystem fs = FileSystem.get(options.getConf()); RemoteIterator<LocatedFileStatus> rIter = fs.listFiles(new Path(targetDir), false);
int fileCount = 0; while(rIter.hasNext()) { fileCount++; rIter.next(); }
LOG.info("---------------fileCount=" + fileCount);
if(numMappers != fileCount) { LOG.error("files number in hdfs not equals mapper task number !"); return 2; } } catch (IOException e) { LOG.error("count files number from hdfs error !"); e.printStackTrace(); return 3; } }
|
改动只针对Sqoop集成HCatalog方式导入ORC格式的情况。因为我们的数据仓库中都采用的是这种方式。
优化:当MySQL中记录数特别少时,如少于4条记录,则默认Sqoop的MapTask数量为4但其实际执行时因为原始记录数不够则实际执行的MapTask数量会跟实际的记录数一致,此时split数量跟落地HDFS的文件数量一致。所以,可以根据Sqoop对应MR的实际split数量进行判断文件数量。