Spark任务读取HDFS文件报Filesystem closed异常

栏目: 服务器 · 发布时间: 6年前

内容简介:今天早上过来发现昨天晚上浏览器搜索样本生成任务出问题了。日志显示:前面遇到很多次 Spark 任务执行异常,这是难得的一次异常堆栈这么清晰明了的。就是 Spark 的 executor 的 task 在从 HDFS 上下载插件到本地目录的时候报错了,错误显示是读取 HDFS 文件流时检查发现文件系统被关闭了。看一下我们加载插件的代码:

现象

今天早上过来发现昨天晚上浏览器搜索样本生成任务出问题了。日志显示:

[INFO ] 2018-10-18 22:03:41.443 [launcher-proc-1] spark-sample-generate -       at com.argan.internt.ai.platform.datadriver.sample.plugin.DefaultPluginServiceImpl.loadPlugin(Default
PluginServiceImpl.java:65)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at com.argan.internt.ai.platform.datadriver.sample.function.SampleRddReducerFunction.getFeatureProces
sor(SampleRddReducerFunction.java:72)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at com.argan.internt.ai.platform.datadriver.sample.function.SampleRddReducerFunction.call(SampleRddRe
ducerFunction.java:60)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at com.argan.internt.ai.platform.datadriver.sample.function.SampleRddReducerFunction.call(SampleRddRe
ducerFunction.java:26)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7
.apply$mcV$sp(PairRDDFunctions.scala:1196)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7
.apply(PairRDDFunctions.scala:1195)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7
.apply(PairRDDFunctions.scala:1195)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1282)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFun
ctions.scala:1203)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFun
ctions.scala:1183)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.scheduler.Task.run(Task.scala:89)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at java.lang.Thread.run(Thread.java:745)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate - Caused by: java.io.IOException: Filesystem closed
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:892)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:884)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:957)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at java.io.DataInputStream.read(DataInputStream.java:100)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:93)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:61)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:121)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:369)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:292)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2123)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2092)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2068)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       at com.argan.internt.ai.platform.algorithm.commons.util.HdfsClient.copyToLocal(HdfsClient.java:95)
[INFO ] 2018-10-18 22:03:41.444 [launcher-proc-1] spark-sample-generate -       ... 19 more

前面遇到很多次 Spark 任务执行异常,这是难得的一次异常堆栈这么清晰明了的。就是 Spark 的 executor 的 task 在从 HDFS 上下载插件到本地目录的时候报错了,错误显示是读取 HDFS 文件流时检查发现文件系统被关闭了。

分析

看一下我们加载插件的代码:

public class DefaultPluginServiceImpl implements PluginService {

    ...

    @Override
    public void loadPlugin(String pluginName) {

        ...

        HdfsClient.build( hdfsRoot, hdfsUsername, hdfsWorkDir ).copyToLocal( hdfsLoc, destFile ).close();

        ...
    }

    ...

}


public class HdfsClient implements Closeable {

    ...

    private FileSystem fileSystem;

    public static HdfsClient build(String hdfsRoot, String loginName, String workDir) {
        HdfsClient client = new HdfsClient(hdfsRoot, loginName, workDir);
        client.init();
        return client;
    }

    private void init() throws IllegalArgumentException {

        ...

        System.setProperty("HADOOP_USER_NAME", loginName);
        Configuration cfg = new Configuration();
        cfg.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); // 避免打jar包时SPI文件出现问题

        String[] rootPaths = hdfsRoot.split(",");

        this.fileSystem = getFileSystem(rootPaths, workDir, cfg);
    }

    /**
     * 根据路径获取FileSystem,因为ip可能会变,因此要尝试不同的连接地址
     */
    private FileSystem getFileSystem(String[] rootPaths, String uploadPath, Configuration cfg) throws RuntimeException {

        FileSystem fileSystem = null;
        Throwable ex = null;
        for (String path : rootPaths) {
            try {
                fileSystem = FileSystem.get(URI.create(path), cfg);
                fileSystem.exists(new Path(uploadPath)); // 检查hdfs服务是否正常
                return fileSystem;
            } catch (Throwable e) {
                ex = e;
                logger.warn("HDFS init FileSystem error:" + path);
            }
        }
        logger.error("HDFS init error:" + hdfsRoot, ex);
        throw new RuntimeException("HDFS init error");
    }

    public void close() {
        if (null != fileSystem) {
            try {
                fileSystem.close();
            } catch (IOException e) {
                logger.error("hdfs close exception", e);
            }
        }
    }
}

可以看到每个HdfsClient实例相当于一个FileSystem实例。让我们分析一下代码:

HdfsClient.build( hdfsRoot, hdfsUsername, hdfsWorkDir ).copyToLocal( hdfsLoc, destFile ).close();

这里每次都会 new 一个HdfsClient实例也就是得到一个FileSystem,然后去把插件 copyToLocal,最后 close 掉。这里每次 copyToLocal 都 close 掉 FileSystem 在多线程环境下确实会存在问题。但是这里 HdfsClient 并不是单例,难道它的实例变量 fileSystem 是共享的?fileSystem的获取是通过 fileSystem = FileSystem.get(URI.create(path), cfg); 得到的,跟进去FileSystem源码看一下就知道了:

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured implements Closeable {

  /** FileSystem cache */
  static final Cache CACHE = new Cache();

  /** The key this instance is stored under in the cache. */
  private Cache.Key key;

  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }

    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }

  /** Caching FileSystem objects */
  static class Cache {
    private final ClientFinalizer clientFinalizer = new ClientFinalizer();

    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
    private final Set<Key> toAutoClose = new HashSet<Key>();

    /** A variable that makes all objects in the cache unique */
    private static AtomicLong unique = new AtomicLong(1);

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
    }

    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      FileSystem fs;
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }

      fs = createFileSystem(uri, conf);
      synchronized (this) { // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }
        
        // now insert the new file system into the map
        if (map.isEmpty()
                && !ShutdownHookManager.get().isShutdownInProgress()) {
          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
        }
        fs.key = key;
        map.put(key, fs);
        if (conf.getBoolean("fs.automatic.close", true)) {
          toAutoClose.add(key);
        }
        return fs;
      }
    }
  }

  /** FileSystem.Cache.Key */
    static class Key {
      final String scheme;
      final String authority;
      final UserGroupInformation ugi;
      final long unique;   // an artificial way to make a key unique

      Key(URI uri, Configuration conf) throws IOException {
        this(uri, conf, 0);
      }

      Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
        authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
        this.unique = unique;
        
        this.ugi = UserGroupInformation.getCurrentUser();
      }

      @Override
      public int hashCode() {
        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
      }

      static boolean isEqual(Object a, Object b) {
        return a == b || (a != null && a.equals(b));        
      }

      @Override
      public boolean equals(Object obj) {
        if (obj == this) {
          return true;
        }
        if (obj != null && obj instanceof Key) {
          Key that = (Key)obj;
          return isEqual(this.scheme, that.scheme)
                 && isEqual(this.authority, that.authority)
                 && isEqual(this.ugi, that.ugi)
                 && (this.unique == that.unique);
        }
        return false;        
      }

      @Override
      public String toString() {
        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
      }
    }
  }
}

果然是源码之前了无秘密, FileSystem.get 静态方法确实不是每次都返回一个新的 FileSystem 实例,如果打开了缓存的话,就会直接从静态缓存对象中返回已经创建的实例。而缓存默认是打开的。缓存的方式就是常见的懒加载模式(存在就返回,不存在就创建并放在 cache 中)。值得注意的是缓存的 key 其实跟 conf 其实没有什么关系,主要是 uri (rootPath)。

晓峰他们昨天晚上没有搞定是因为之前在跑应用市场搜索广告的样本训练任务的时候是没有问题的。同样的代码,只是这个下载浏览器搜索广告的插件出问题了,所以方向就被带歪了。但是实际上,对比一下两者样本训练任务的参数,发现两者配置的 spark.executor.cores 参数值并不一样,前者是配置了 1个而后者是配置了4个,这就是解释了前者为什么一直没有这个问题。

解决方案

知道了原因解决方案就容易了。一种解决方案是关闭缓存:

conf.setBoolean("fs.hdfs.impl.disable.cache", true);

网上也有人建议说不要 close 掉文件系统,让 Hadoop 自己做资源回收。

  1. Do we need to call fs.close() explicitly while working with HDFS using java API?
  2. Don’t call filesystem.close in Hadoop

最终我们采取了第二种方案。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

C算法(第二卷:图算法)(第3版)

C算法(第二卷:图算法)(第3版)

塞德威克(Sedgewick Robert) / 周良忠 / 第1版 (2004年1月1日) / 2004-4 / 38.0

《C算法(第2卷)(图算法)(第3版)(中文版)》所讨论的图算法,都是实际中解决图问题的最重要的已知方法。《C算法(第2卷)(图算法)(第3版)(中文版)》的主要宗旨是让越来越多需要了解这些算法的人的能够掌握这些方法及基本原理。书中根据基本原理从基本住处开始循序渐进地讲解,然后再介绍一些经典方法,最后介绍仍在进行研究和发展的现代技术。精心挑选的实例、详尽的图示以及完整的实现代码与正文中的算法和应用......一起来看看 《C算法(第二卷:图算法)(第3版)》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

MD5 加密
MD5 加密

MD5 加密工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试