From 3a86c8bbd1889bb2f994528ed43acb81bac7229c Mon Sep 17 00:00:00 2001 From: jemunro Date: Fri, 30 Sep 2022 12:42:16 +1000 Subject: [PATCH 1/6] draft withOp --- .../nextflow/extension/OperatorImpl.groovy | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index c1602a9569..6df669a04d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -1493,4 +1493,20 @@ class OperatorImpl { log.warn "Operator `fork` has been renamed to `multiMap`" multiMap(source, action) } + + /** + * Implement a `withOp` operator e.g. + *
+     *     someChannel | withOp { someProcess(it, someChannel) }
+           someChannel | withOp { someProcess(someChannel, it) }
+           someChannel | withOp { someChannel.someOperator(it) }
+     * 
+ * + * @param source The channel instance to be bound in the context + * @param closure A closure defining the operation/process to call + */ + + Object withOp(final DataflowReadChannel source, Closure closure) { + source.with(closure) + } } From bd9c4d87dcad6060c19701037e6d9460999aeafc Mon Sep 17 00:00:00 2001 From: jemunro Date: Wed, 5 Oct 2022 16:41:46 +1100 Subject: [PATCH 2/6] implement ExecOp --- .../nextflow/extension/OperatorImpl.groovy | 27 +++++++++++-------- .../groovy/nextflow/script/ChannelOut.groovy | 4 +++ 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index 6df669a04d..fedc8a6182 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -40,6 +40,7 @@ import nextflow.Session import nextflow.script.ChannelOut import nextflow.script.TokenBranchDef import nextflow.script.TokenMultiMapDef +import nextflow.script.ChainableDef import nextflow.splitter.FastaSplitter import nextflow.splitter.FastqSplitter import nextflow.splitter.TextSplitter @@ -1495,18 +1496,22 @@ class OperatorImpl { } /** - * Implement a `withOp` operator e.g. - *
-     *     someChannel | withOp { someProcess(it, someChannel) }
-           someChannel | withOp { someProcess(someChannel, it) }
-           someChannel | withOp { someChannel.someOperator(it) }
-     * 
- * - * @param source The channel instance to be bound in the context - * @param closure A closure defining the operation/process to call + * Implement `exec` operator e.g. */ - Object withOp(final DataflowReadChannel source, Closure closure) { - source.with(closure) + Object exec(DataflowReadChannel source, ChainableDef chainableDef, Object... args) { + new ExecOp(source, chainableDef, args).apply() + } + + Object exec(DataflowReadChannel source, ChainableDef chainableDef, Closure closure) { + new ExecOp(source, chainableDef, closure).apply() + } + + Object exec(DataflowReadChannel source, OpCall opCall, Object... args) { + new ExecOp(source, opCall, args).apply() + } + + Object exec(DataflowReadChannel source, OpCall opCall, Closure closure) { + new ExecOp(source, opCall, closure).apply() } } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ChannelOut.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ChannelOut.groovy index 701bac3243..98fadc8366 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ChannelOut.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ChannelOut.groovy @@ -90,7 +90,10 @@ class ChannelOut implements List { */ static List spread(Object[] args) { final result = new ArrayList(args.size()*2) + println "spread.result: $result" + println "spread.args: $args" for( int i=0; i { result.add(args[i]) } } + println "spread.result: $result" return result } From ed9b268d8ba6713675b8d3cf8e6301021840f7b3 Mon Sep 17 00:00:00 2001 From: jemunro Date: Thu, 6 Oct 2022 23:54:58 +1100 Subject: [PATCH 3/6] add exec and eval --- .../groovy/nextflow/extension/OpCall.groovy | 18 ++++++++++++- .../nextflow/extension/OperatorImpl.groovy | 25 ++++++++++--------- .../groovy/nextflow/script/ChannelOut.groovy | 4 --- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OpCall.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OpCall.groovy index 8b7f9cf41c..f85d4c4189 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OpCall.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OpCall.groovy @@ -27,6 +27,10 @@ class OpCall implements Callable { final static private List SPECIAL_NAMES = ["choice","merge","separate"] final static private String SET_OP_hack = 'set' + + final static private String EVAL_OP_hack = 'eval' + + final static private String EXEC_OP_hack = 'exec' static ThreadLocal current = new ThreadLocal<>() @@ -78,8 +82,20 @@ class OpCall implements Callable { return this } - if( args.size() ) + if( methodName == EVAL_OP_hack ) { + source = left[0] as DataflowWriteChannel + args = ([left] as Object[]) + args + return this + } + + if( args.size() ) { + if( methodName == EXEC_OP_hack ) { + source = left[0] as DataflowWriteChannel + args = (left[1..-1] as Object[]) + args + return this + } throw new ScriptRuntimeException("Multi-channel output cannot be applied to operator ${methodName} for which argument is already provided") + } source = left[0] as DataflowWriteChannel args = left[1..-1] as Object[] diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index fedc8a6182..ef037b0a73 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -40,7 +40,7 @@ import nextflow.Session import nextflow.script.ChannelOut import nextflow.script.TokenBranchDef import nextflow.script.TokenMultiMapDef -import nextflow.script.ChainableDef +import nextflow.script.ProcessDef import nextflow.splitter.FastaSplitter import nextflow.splitter.FastqSplitter import nextflow.splitter.TextSplitter @@ -1496,22 +1496,23 @@ class OperatorImpl { } /** - * Implement `exec` operator e.g. + * Implement `exec` operator */ - - Object exec(DataflowReadChannel source, ChainableDef chainableDef, Object... args) { - new ExecOp(source, chainableDef, args).apply() + Object exec(DataflowReadChannel source, ProcessDef processDef, Object... args) { + new ExecOp(source, processDef, args).apply() } - Object exec(DataflowReadChannel source, ChainableDef chainableDef, Closure closure) { - new ExecOp(source, chainableDef, closure).apply() + /* + * Implement `eval` operator + */ + Object eval(DataflowReadChannel source, Closure closure) { + def result = new EvalOp(source, closure).apply() + return result } - Object exec(DataflowReadChannel source, OpCall opCall, Object... args) { - new ExecOp(source, opCall, args).apply() + Object eval(DataflowReadChannel source, ChannelOut channelOut, Closure closure) { + def result = new EvalOp(channelOut, closure).apply() + return result } - Object exec(DataflowReadChannel source, OpCall opCall, Closure closure) { - new ExecOp(source, opCall, closure).apply() - } } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ChannelOut.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ChannelOut.groovy index 98fadc8366..701bac3243 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ChannelOut.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ChannelOut.groovy @@ -90,10 +90,7 @@ class ChannelOut implements List { */ static List spread(Object[] args) { final result = new ArrayList(args.size()*2) - println "spread.result: $result" - println "spread.args: $args" for( int i=0; i { result.add(args[i]) } } - println "spread.result: $result" return result } From 3e81cff43067a96e2bea5782cfbd85db1bdf46f8 Mon Sep 17 00:00:00 2001 From: jemunro Date: Thu, 6 Oct 2022 23:56:22 +1100 Subject: [PATCH 4/6] add EvalOp and ExecOp --- .../groovy/nextflow/extension/EvalOp.groovy | 62 ++++++++++++++++++ .../groovy/nextflow/extension/ExecOp.groovy | 64 +++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy create mode 100644 modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy new file mode 100644 index 0000000000..74878248e2 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy @@ -0,0 +1,62 @@ +/* + * Copyright 2020-2022, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import groovy.transform.CompileStatic +import groovyx.gpars.dataflow.DataflowReadChannel +import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.script.ChannelOut + +/** + * Implements the {@link OperatorImpl#eval} operator + * + * @author Jacob Munro + */ +@CompileStatic +class EvalOp { + + private Object source + + private Closure closure + + EvalOp( ChannelOut source, Closure closure ) { + assert source != null + assert closure != null + + this.source = source + this.closure = closure + } + + EvalOp( DataflowReadChannel source, Closure closure ) { + assert source != null + assert closure != null + + this.source = source as DataflowWriteChannel + this.closure = closure + } + + Object apply() { + + final copy = (Closure)closure.clone() + copy.setResolveStrategy(Closure.DELEGATE_FIRST) + def result = source.with(copy) + + return result + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy new file mode 100644 index 0000000000..69cadc7736 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy @@ -0,0 +1,64 @@ +/* + * Copyright 2020-2022, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import groovy.transform.CompileStatic +import groovyx.gpars.dataflow.DataflowReadChannel +import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.script.ProcessDef +import nextflow.script.ChannelOut + +/** + * Implements the {@link OperatorImpl#exec} operator + * + * @author Jacob Munro + */ +@CompileStatic +class ExecOp { + + private ProcessDef processDef + + private DataflowReadChannel source + + private Object[] args + + ExecOp( DataflowReadChannel source, ProcessDef processDef, Object[] args ) { + assert processDef != null + assert source != null + assert args + + this.source = source + this.processDef = processDef + this.args = args + } + + Object apply() { + + // return processDef.invoke_o(resolveInputs()) + def result = processDef.run(resolveInputs()) + + return result + } + + private Object[] resolveInputs() { + + Object[] resultArray = ([source] + (args as List)).toArray() + + return ChannelOut.spread(resultArray).toArray() + } +} From 6dce1afc92694cf06f9e798694b052e1c54b7cbd Mon Sep 17 00:00:00 2001 From: jemunro Date: Fri, 7 Oct 2022 10:48:00 +1100 Subject: [PATCH 5/6] update --- .../main/groovy/nextflow/extension/EvalOp.groovy | 2 +- .../main/groovy/nextflow/extension/ExecOp.groovy | 8 +++----- .../groovy/nextflow/extension/OperatorImpl.groovy | 15 ++++++++++----- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy index 74878248e2..09e1f2500f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy @@ -31,7 +31,7 @@ import nextflow.script.ChannelOut class EvalOp { private Object source - + private Closure closure EvalOp( ChannelOut source, Closure closure ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy index 69cadc7736..27b79f6fad 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy @@ -19,7 +19,6 @@ package nextflow.extension import groovy.transform.CompileStatic import groovyx.gpars.dataflow.DataflowReadChannel -import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.script.ProcessDef import nextflow.script.ChannelOut @@ -34,7 +33,7 @@ class ExecOp { private ProcessDef processDef private DataflowReadChannel source - + private Object[] args ExecOp( DataflowReadChannel source, ProcessDef processDef, Object[] args ) { @@ -48,10 +47,9 @@ class ExecOp { } Object apply() { - - // return processDef.invoke_o(resolveInputs()) + def result = processDef.run(resolveInputs()) - + return result } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index ef037b0a73..c2cddc35aa 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -1499,20 +1499,25 @@ class OperatorImpl { * Implement `exec` operator */ Object exec(DataflowReadChannel source, ProcessDef processDef, Object... args) { - new ExecOp(source, processDef, args).apply() + + return new ExecOp(source, processDef, args).apply() } /* * Implement `eval` operator */ Object eval(DataflowReadChannel source, Closure closure) { - def result = new EvalOp(source, closure).apply() - return result + // No need to set DAG Node + OpCall.current.get().ignoreDagNode = true + + return new EvalOp(source, closure).apply() } Object eval(DataflowReadChannel source, ChannelOut channelOut, Closure closure) { - def result = new EvalOp(channelOut, closure).apply() - return result + // No need to set DAG Node + OpCall.current.get().ignoreDagNode = true + + return new EvalOp(channelOut, closure).apply() } } From 5303070da864e81d2b81c053923f0c04d81915e0 Mon Sep 17 00:00:00 2001 From: jemunro Date: Mon, 7 Nov 2022 17:39:02 +1100 Subject: [PATCH 6/6] Update ExecOp --- .../src/main/groovy/nextflow/extension/ExecOp.groovy | 1 - .../src/main/groovy/nextflow/extension/OperatorImpl.groovy | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy index 27b79f6fad..a5464aa4b6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy @@ -39,7 +39,6 @@ class ExecOp { ExecOp( DataflowReadChannel source, ProcessDef processDef, Object[] args ) { assert processDef != null assert source != null - assert args this.source = source this.processDef = processDef diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index c2cddc35aa..c03f475eac 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -1499,6 +1499,8 @@ class OperatorImpl { * Implement `exec` operator */ Object exec(DataflowReadChannel source, ProcessDef processDef, Object... args) { + // don't set DAG Node + OpCall.current.get().ignoreDagNode = true return new ExecOp(source, processDef, args).apply() } @@ -1507,14 +1509,14 @@ class OperatorImpl { * Implement `eval` operator */ Object eval(DataflowReadChannel source, Closure closure) { - // No need to set DAG Node + // don't set DAG Node OpCall.current.get().ignoreDagNode = true return new EvalOp(source, closure).apply() } Object eval(DataflowReadChannel source, ChannelOut channelOut, Closure closure) { - // No need to set DAG Node + // don't set DAG Node OpCall.current.get().ignoreDagNode = true return new EvalOp(channelOut, closure).apply()