MapNode¶
If you want to iterate over a list of inputs, but need to feed all iterated outputs afterward as one input (an array) to the next node, you need to use a MapNode
. A MapNode
is quite similar to a normal Node
, but it can take a list of inputs and operate over each input separately, ultimately returning a list of outputs.
Imagine that you have a list of items (let’s say files) and you want to execute the same node on them (for example some smoothing or masking). Some nodes accept multiple files and do exactly the same thing on them, but some don’t (they expect only one file). MapNode
can solve this problem. Imagine you have the following workflow:
Node A
outputs a list of files, but node B
accepts only one file. Additionally, C
expects a list of files. What you would like is to run B
for every file in the output of A
and collect the results as a list and feed it to C
. Something like this:
from nipype import Node, MapNode, Workflow
a = Node(interface=A(), name="a")
b = MapNode(interface=B(), name="b", iterfield=['in_file'])
c = Node(interface=C(), name="c")
my_workflow = Workflow(name="my_workflow")
my_workflow.connect([(a,b,[('out_files','in_file')]),
(b,c,[('out_file','in_files')])
])
Let’s demonstrate this with a simple function interface:
from nipype import Function
def square_func(x):
return x ** 2
square = Function(["x"], ["f_x"], square_func)
We see that this function just takes a numeric input and returns its squared value.
square.run(x=2).outputs.f_x
4
What if we wanted to square a list of numbers? We could set an iterable and just split up the workflow in multiple sub-workflows. But say we were making a simple workflow that squared a list of numbers and then summed them. The sum node would expect a list, but using an iterable would make a bunch of sum nodes, and each would get one number from the list. The solution here is to use a MapNode
.
iterfield
¶
The MapNode
constructor has a field called iterfield
, which tells it what inputs should be expecting a list.
from nipype import MapNode
square_node = MapNode(square, name="square", iterfield=["x"])
square_node.inputs.x = [0, 1, 2, 3]
res = square_node.run()
211017-17:58:52,265 nipype.workflow INFO:
[Node] Setting-up "square" in "/tmp/tmp52eg73s9/square".
211017-17:58:52,276 nipype.workflow INFO:
[Node] Setting-up "_square0" in "/tmp/tmp52eg73s9/square/mapflow/_square0".
211017-17:58:52,280 nipype.workflow INFO:
[Node] Running "_square0" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,305 nipype.workflow INFO:
[Node] Finished "_square0".
211017-17:58:52,309 nipype.workflow INFO:
[Node] Setting-up "_square1" in "/tmp/tmp52eg73s9/square/mapflow/_square1".
211017-17:58:52,324 nipype.workflow INFO:
[Node] Running "_square1" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,331 nipype.workflow INFO:
[Node] Finished "_square1".
211017-17:58:52,336 nipype.workflow INFO:
[Node] Setting-up "_square2" in "/tmp/tmp52eg73s9/square/mapflow/_square2".
211017-17:58:52,344 nipype.workflow INFO:
[Node] Running "_square2" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,351 nipype.workflow INFO:
[Node] Finished "_square2".
211017-17:58:52,354 nipype.workflow INFO:
[Node] Setting-up "_square3" in "/tmp/tmp52eg73s9/square/mapflow/_square3".
211017-17:58:52,361 nipype.workflow INFO:
[Node] Running "_square3" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,370 nipype.workflow INFO:
[Node] Finished "_square3".
211017-17:58:52,374 nipype.workflow INFO:
[Node] Finished "square".
res.outputs.f_x
[0, 1, 4, 9]
Because iterfield
can take a list of names, you can operate over multiple sets of data, as long as they’re the same length. The values in each list will be paired; it does not compute a combinatoric product of the lists.
def power_func(x, y):
return x ** y
power = Function(["x", "y"], ["f_xy"], power_func)
power_node = MapNode(power, name="power", iterfield=["x", "y"])
power_node.inputs.x = [0, 1, 2, 3]
power_node.inputs.y = [0, 1, 2, 3]
res = power_node.run()
211017-17:58:52,426 nipype.workflow INFO:
[Node] Setting-up "power" in "/tmp/tmpk4mrv4ue/power".
211017-17:58:52,437 nipype.workflow INFO:
[Node] Setting-up "_power0" in "/tmp/tmpk4mrv4ue/power/mapflow/_power0".
211017-17:58:52,444 nipype.workflow INFO:
[Node] Running "_power0" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,452 nipype.workflow INFO:
[Node] Finished "_power0".
211017-17:58:52,454 nipype.workflow INFO:
[Node] Setting-up "_power1" in "/tmp/tmpk4mrv4ue/power/mapflow/_power1".
211017-17:58:52,463 nipype.workflow INFO:
[Node] Running "_power1" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,470 nipype.workflow INFO:
[Node] Finished "_power1".
211017-17:58:52,473 nipype.workflow INFO:
[Node] Setting-up "_power2" in "/tmp/tmpk4mrv4ue/power/mapflow/_power2".
211017-17:58:52,477 nipype.workflow INFO:
[Node] Running "_power2" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,484 nipype.workflow INFO:
[Node] Finished "_power2".
211017-17:58:52,487 nipype.workflow INFO:
[Node] Setting-up "_power3" in "/tmp/tmpk4mrv4ue/power/mapflow/_power3".
211017-17:58:52,495 nipype.workflow INFO:
[Node] Running "_power3" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,505 nipype.workflow INFO:
[Node] Finished "_power3".
211017-17:58:52,510 nipype.workflow INFO:
[Node] Finished "power".
print(res.outputs.f_xy)
[1, 1, 4, 27]
But not every input needs to be an iterfield.
power_node = MapNode(power, name="power", iterfield=["x"])
power_node.inputs.x = [0, 1, 2, 3]
power_node.inputs.y = 3
res = power_node.run()
211017-17:58:52,536 nipype.workflow INFO:
[Node] Setting-up "power" in "/tmp/tmpks_3y7g6/power".
211017-17:58:52,547 nipype.workflow INFO:
[Node] Setting-up "_power0" in "/tmp/tmpks_3y7g6/power/mapflow/_power0".
211017-17:58:52,550 nipype.workflow INFO:
[Node] Running "_power0" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,557 nipype.workflow INFO:
[Node] Finished "_power0".
211017-17:58:52,561 nipype.workflow INFO:
[Node] Setting-up "_power1" in "/tmp/tmpks_3y7g6/power/mapflow/_power1".
211017-17:58:52,567 nipype.workflow INFO:
[Node] Running "_power1" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,577 nipype.workflow INFO:
[Node] Finished "_power1".
211017-17:58:52,580 nipype.workflow INFO:
[Node] Setting-up "_power2" in "/tmp/tmpks_3y7g6/power/mapflow/_power2".
211017-17:58:52,585 nipype.workflow INFO:
[Node] Running "_power2" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,595 nipype.workflow INFO:
[Node] Finished "_power2".
211017-17:58:52,599 nipype.workflow INFO:
[Node] Setting-up "_power3" in "/tmp/tmpks_3y7g6/power/mapflow/_power3".
211017-17:58:52,604 nipype.workflow INFO:
[Node] Running "_power3" ("nipype.interfaces.utility.wrappers.Function")
211017-17:58:52,614 nipype.workflow INFO:
[Node] Finished "_power3".
211017-17:58:52,619 nipype.workflow INFO:
[Node] Finished "power".
print(res.outputs.f_xy)
[0, 1, 8, 27]
As in the case of iterables
, each underlying MapNode
execution can happen in parallel. Hopefully, you see how these tools allow you to write flexible, reusable workflows that will help you process large amounts of data efficiently and reproducibly.
In more advanced applications it is useful to be able to iterate over items of nested lists (for example [[1,2],[3,4]]
). MapNode allows you to do this with the “nested=True” parameter. Outputs will preserve the same nested structure as the inputs.
Why is this important?¶
Let’s consider we have multiple functional images (A) and each of them should be motioned corrected (B1, B2, B3,..). But afterward, we want to put them all together into a GLM, i.e. the input for the GLM should be an array of [B1, B2, B3, …]. Iterables can’t do that. They would split up the pipeline. Therefore, we need MapNodes.
Let’s look at a simple example, where we want to motion correct two functional images. For this we need two nodes:
Gunzip, to unzip the files (plural)
Realign, to do the motion correction
from nipype.algorithms.misc import Gunzip
from nipype.interfaces.spm import Realign
from nipype import Node, MapNode, Workflow
# Here we specify a list of files (for this tutorial, we just add the same file twice)
files = ['/data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz',
'/data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz']
realign = Node(Realign(register_to_mean=True),
name='motion_correction')
If we try to specify the input for the Gunzip node with a simple Node, we get the following error:
gunzip = Node(Gunzip(), name='gunzip',)
try:
gunzip.inputs.in_file = files
except(Exception) as err:
if "TraitError" in str(err.__class__):
print("TraitError:", err)
else:
raise
else:
raise
TraitError: The 'in_file' trait of a GunzipInputSpec instance must be a pathlike object or string representing an existing file, but a value of "['/data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz', '/data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz']" <class 'str'> was specified.
TraitError: The 'in_file' trait of a GunzipInputSpec instance must be an existing file name, but a value of ['/data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz', '/data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz'] <class 'list'> was specified.
But if we do it with a MapNode, it works:
gunzip = MapNode(Gunzip(), name='gunzip',
iterfield=['in_file'])
gunzip.inputs.in_file = files
Now, we just have to create a workflow, connect the nodes and we can run it:
mcflow = Workflow(name='realign_with_spm')
mcflow.connect(gunzip, 'out_file', realign, 'in_files')
mcflow.base_dir = '/output'
mcflow.run('MultiProc', plugin_args={'n_procs': 4})
211017-17:58:53,160 nipype.workflow INFO:
Workflow realign_with_spm settings: ['check', 'execution', 'logging', 'monitoring']
211017-17:58:53,165 nipype.workflow INFO:
Running in parallel.
211017-17:58:53,169 nipype.workflow INFO:
[MultiProc] Running 0 tasks, and 1 jobs ready. Free memory (GB): 1.75/1.75, Free processors: 4/4.
211017-17:58:55,175 nipype.workflow INFO:
[MultiProc] Running 0 tasks, and 2 jobs ready. Free memory (GB): 1.75/1.75, Free processors: 4/4.
211017-17:58:55,251 nipype.workflow INFO:
[Node] Setting-up "_gunzip0" in "/output/realign_with_spm/gunzip/mapflow/_gunzip0".
211017-17:58:55,251 nipype.workflow INFO:
[Node] Setting-up "_gunzip1" in "/output/realign_with_spm/gunzip/mapflow/_gunzip1".
211017-17:58:55,258 nipype.workflow INFO:
[Node] Running "_gunzip0" ("nipype.algorithms.misc.Gunzip")
211017-17:58:55,261 nipype.workflow INFO:
[Node] Running "_gunzip1" ("nipype.algorithms.misc.Gunzip")
211017-17:58:55,744 nipype.workflow INFO:
[Node] Finished "_gunzip0".
211017-17:58:55,744 nipype.workflow INFO:
[Node] Finished "_gunzip1".
211017-17:58:57,179 nipype.workflow INFO:
[Job 2] Completed (_gunzip0).
211017-17:58:57,183 nipype.workflow INFO:
[Job 3] Completed (_gunzip1).
211017-17:58:57,188 nipype.workflow INFO:
[MultiProc] Running 0 tasks, and 1 jobs ready. Free memory (GB): 1.75/1.75, Free processors: 4/4.
211017-17:58:57,267 nipype.workflow INFO:
[Node] Setting-up "realign_with_spm.gunzip" in "/output/realign_with_spm/gunzip".
211017-17:58:57,281 nipype.workflow INFO:
[Node] Setting-up "_gunzip0" in "/output/realign_with_spm/gunzip/mapflow/_gunzip0".
211017-17:58:57,292 nipype.workflow INFO:
[Node] Cached "_gunzip0" - collecting precomputed outputs
211017-17:58:57,297 nipype.workflow INFO:
[Node] "_gunzip0" found cached.
211017-17:58:57,301 nipype.workflow INFO:
[Node] Setting-up "_gunzip1" in "/output/realign_with_spm/gunzip/mapflow/_gunzip1".
211017-17:58:57,312 nipype.workflow INFO:
[Node] Cached "_gunzip1" - collecting precomputed outputs
211017-17:58:57,316 nipype.workflow INFO:
[Node] "_gunzip1" found cached.
211017-17:58:57,327 nipype.workflow INFO:
[Node] Finished "realign_with_spm.gunzip".
211017-17:58:59,139 nipype.workflow INFO:
[Job 0] Completed (realign_with_spm.gunzip).
211017-17:58:59,143 nipype.workflow INFO:
[MultiProc] Running 0 tasks, and 1 jobs ready. Free memory (GB): 1.75/1.75, Free processors: 4/4.
211017-17:58:59,200 nipype.workflow INFO:
[Node] Setting-up "realign_with_spm.motion_correction" in "/output/realign_with_spm/motion_correction".
211017-17:58:59,358 nipype.workflow INFO:
[Node] Running "motion_correction" ("nipype.interfaces.spm.preprocess.Realign")
211017-17:59:01,140 nipype.workflow INFO:
[MultiProc] Running 1 tasks, and 0 jobs ready. Free memory (GB): 1.55/1.75, Free processors: 3/4.
Currently running:
* realign_with_spm.motion_correction
211017-18:00:26,608 nipype.workflow INFO:
[Node] Finished "realign_with_spm.motion_correction".
211017-18:00:27,144 nipype.workflow INFO:
[Job 1] Completed (realign_with_spm.motion_correction).
211017-18:00:27,148 nipype.workflow INFO:
[MultiProc] Running 0 tasks, and 0 jobs ready. Free memory (GB): 1.75/1.75, Free processors: 4/4.
<networkx.classes.digraph.DiGraph at 0x7fe5f634e6d0>
Exercise 1¶
Create a workflow to calculate a sum of factorials of numbers from a range between \(n_{min}\) and \(n_{max}\), i.e.:
if \(n_{min}=0\) and \(n_{max}=3\) $\(\sum _{k=0}^{3} k! = 0! + 1! +2! + 3! = 1 + 1 + 2 + 6 = 10\)$
Use Node
for a function that creates a list of integers and a function that sums everything at the end. Use MapNode
to calculate factorials.
#write your solution here
from nipype import Workflow, Node, MapNode, Function
import os
def range_fun(n_min, n_max):
return list(range(n_min, n_max+1))
def factorial(n):
# print("FACTORIAL, {}".format(n))
import math
return math.factorial(n)
def summing(terms):
return sum(terms)
wf_ex1 = Workflow('ex1')
wf_ex1.base_dir = os.getcwd()
range_nd = Node(Function(input_names=['n_min', 'n_max'],
output_names=['range_list'],
function=range_fun),
name='range_list')
factorial_nd = MapNode(Function(input_names=['n'],
output_names=['fact_out'],
function=factorial),
iterfield=['n'],
name='factorial')
summing_nd = Node(Function(input_names=['terms'],
output_names=['sum_out'],
function=summing),
name='summing')
range_nd.inputs.n_min = 0
range_nd.inputs.n_max = 3
wf_ex1.add_nodes([range_nd])
wf_ex1.connect(range_nd, 'range_list', factorial_nd, 'n')
wf_ex1.connect(factorial_nd, 'fact_out', summing_nd, "terms")
eg = wf_ex1.run()
211017-18:00:29,156 nipype.workflow INFO:
Workflow ex1 settings: ['check', 'execution', 'logging', 'monitoring']
211017-18:00:29,190 nipype.workflow INFO:
Running serially.
211017-18:00:29,191 nipype.workflow INFO:
[Node] Setting-up "ex1.range_list" in "/home/neuro/workshop_weizmann/workshop/nipype/notebooks/ex1/range_list".
211017-18:00:29,213 nipype.workflow INFO:
[Node] Running "range_list" ("nipype.interfaces.utility.wrappers.Function")
211017-18:00:29,252 nipype.workflow INFO:
[Node] Finished "ex1.range_list".
211017-18:00:29,253 nipype.workflow INFO:
[Node] Setting-up "ex1.factorial" in "/home/neuro/workshop_weizmann/workshop/nipype/notebooks/ex1/factorial".
211017-18:00:29,290 nipype.workflow INFO:
[Node] Setting-up "_factorial0" in "/home/neuro/workshop_weizmann/workshop/nipype/notebooks/ex1/factorial/mapflow/_factorial0".
211017-18:00:29,313 nipype.workflow INFO:
[Node] Running "_factorial0" ("nipype.interfaces.utility.wrappers.Function")
211017-18:00:29,353 nipype.workflow INFO:
[Node] Finished "_factorial0".
211017-18:00:29,362 nipype.workflow INFO:
[Node] Setting-up "_factorial1" in "/home/neuro/workshop_weizmann/workshop/nipype/notebooks/ex1/factorial/mapflow/_factorial1".
211017-18:00:29,385 nipype.workflow INFO:
[Node] Running "_factorial1" ("nipype.interfaces.utility.wrappers.Function")
211017-18:00:29,418 nipype.workflow INFO:
[Node] Finished "_factorial1".
211017-18:00:29,426 nipype.workflow INFO:
[Node] Setting-up "_factorial2" in "/home/neuro/workshop_weizmann/workshop/nipype/notebooks/ex1/factorial/mapflow/_factorial2".
211017-18:00:29,449 nipype.workflow INFO:
[Node] Running "_factorial2" ("nipype.interfaces.utility.wrappers.Function")
211017-18:00:29,491 nipype.workflow INFO:
[Node] Finished "_factorial2".
211017-18:00:29,529 nipype.workflow INFO:
[Node] Setting-up "_factorial3" in "/home/neuro/workshop_weizmann/workshop/nipype/notebooks/ex1/factorial/mapflow/_factorial3".
211017-18:00:29,637 nipype.workflow INFO:
[Node] Running "_factorial3" ("nipype.interfaces.utility.wrappers.Function")
211017-18:00:29,736 nipype.workflow INFO:
[Node] Finished "_factorial3".
211017-18:00:29,771 nipype.workflow INFO:
[Node] Finished "ex1.factorial".
211017-18:00:29,772 nipype.workflow INFO:
[Node] Setting-up "ex1.summing" in "/home/neuro/workshop_weizmann/workshop/nipype/notebooks/ex1/summing".
211017-18:00:29,806 nipype.workflow INFO:
[Node] Running "summing" ("nipype.interfaces.utility.wrappers.Function")
211017-18:00:29,849 nipype.workflow INFO:
[Node] Finished "ex1.summing".
let’s print all nodes:
eg.nodes()
NodeView((ex1.range_list, ex1.factorial, ex1.summing))
the final result should be 10:
list(eg.nodes())[2].result.outputs
sum_out = 10
we can also check the results of two other nodes:
print(list(eg.nodes())[0].result.outputs)
print(list(eg.nodes())[1].result.outputs)
range_list = [0, 1, 2, 3]
Bunch(fact_out=[1, 1, 2, 6])