Skip to content

Google Batch: file().listFiles() fails when trying to read an updating directory #6106

@VasLem

Description

@VasLem

Bug report

listFiles file method raises, complaining with NoSuchFileException, while the file actually exists, when I run gsutil ls on it after the pipeline has ended.
This is probably a race condition situation, where listFiles is accessing a directory that is currently being updated by a publishing mechanism from a process. The error is:

java.nio.file.NoSuchFileException: gs://BUCKET/MY_DIR/FILE_THAT_EXISTS
        at com.google.cloud.storage.contrib.nio.CloudStorageFileSystemProvider.readAttributes(CloudStorageFileSystemProvider.java:812)
        at java.base/java.nio.file.Files.readAttributes(Files.java:1851)
        at java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:220)
        at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:277)
        at java.base/java.nio.file.FileTreeWalker.next(FileTreeWalker.java:374)
        at java.base/java.nio.file.Files.walkFileTree(Files.java:2845)
        at java.base/java.nio.file.Files.walkFileTree(Files.java:2882)
        at org.codehaus.groovy.vmplugin.v8.IndyInterface.fromCache(IndyInterface.java:321)
        at nextflow.extension.FilesEx.listFiles(FilesEx.groovy:645)
        at nextflow.extension.FilesEx.listFiles(FilesEx.groovy)
        at jdk.internal.reflect.GeneratedMethodAccessor310.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.codehaus.groovy.runtime.metaclass.ReflectionMetaMethod.invoke(ReflectionMetaMethod.java:59)
        at org.codehaus.groovy.runtime.metaclass.NewInstanceMetaMethod.invoke(NewInstanceMetaMethod.java:57)
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
        at groovy.lang.MetaClassImpl.doInvokeMethod(MetaClassImpl.java:1333)
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1088)
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
        at groovy.runtime.metaclass.NextflowDelegatingMetaClass.invokeMethod(NextflowDelegatingMetaClass.java:64)
        at org.codehaus.groovy.vmplugin.v8.IndyInterface.fromCache(IndyInterface.java:321)
        at Script_d287fad7bcaf2548$_runScript_closure1$_closure2$_closure5.doCall(Script_d287fad7bcaf2548:62)
        at jdk.internal.reflect.GeneratedMethodAccessor322.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
        at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:280)
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
        at org.codehaus.groovy.vmplugin.v8.IndyInterface.fromCache(IndyInterface.java:321)
        at Script_d287fad7bcaf2548$_runScript_closure1$_closure2$_closure25.doCall(Script_d287fad7bcaf2548:148)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
        at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:280)
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
        at org.codehaus.groovy.vmplugin.v8.IndyInterface.fromCache(IndyInterface.java:321)
        at nextflow.extension.OperatorImpl$_filter_closure5.doCall(OperatorImpl.groovy:280)
        at jdk.internal.reflect.GeneratedMethodAccessor124.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
        at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:280)
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
        at groovy.lang.Closure.call(Closure.java:433)
        at groovyx.gpars.dataflow.operator.DataflowOperatorActor.startTask(DataflowOperatorActor.java:120)
        at groovyx.gpars.dataflow.operator.DataflowOperatorActor.onMessage(DataflowOperatorActor.java:108)
        at groovyx.gpars.actor.impl.SDAClosure$1.call(SDAClosure.java:43)
        at groovyx.gpars.actor.AbstractLoopingActor.runEnhancedWithoutRepliesOnMessages(AbstractLoopingActor.java:293)
        at groovyx.gpars.actor.AbstractLoopingActor.access$400(AbstractLoopingActor.java:30)
        at groovyx.gpars.actor.AbstractLoopingActor$1.handleMessage(AbstractLoopingActor.java:93)
        at groovyx.gpars.util.AsyncMessagingCore.run(AsyncMessagingCore.java:132)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

I am not sure if this problem can be fixed within Nextflow, without overriding the underlying java operations, however there must be a way to recover from this error, as it is not supposed to happen in the first place. For more information, this is a toy example of the code that raises:

def dir_exists_and_is_not_empty = {dir-> {
       file(dir).exists() && file(dir).listFiles().size() > 0}}
my_named_dir_channel = channel.of(['name', "gs://BUCKET/MY_DIR"])
my_named_dir_channel= my_named_dir_channel.filter{
      it-> {
        def x = dir_exists_and_is_not_empty("${it[1]}")
        if (x) println "Existing directory, skipping.."
        x
      }}.map{it->[it[0], file(it[1]).listFiles()]}

Environment

  • Nextflow version: 25.04.1

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions