Workflow: Examples

iDDS examples for subworkflow and loopworkflow. (example: main/lib/idds/tests/test_workflow_condition_v2.py)

Loop workflow

Here is a simple example of loop workflow.

from idds.workflowv2.work import Work, WorkStatus
from idds.workflowv2.workflow import (CompositeCondition, AndCondition, OrCondition,
                                      Condition, ConditionTrigger, Workflow, ParameterLink)

work1 = Work(executable='/bin/hostname', arguments=None, sandbox=None, work_id=1)
work2 = Work(executable='/bin/hostname', arguments=None, sandbox=None, work_id=2)

workflow = Workflow()
workflow.add_work(work1, initial=False)
workflow.add_work(work2, initial=False)

cond = Condition(cond=work2.is_finished)
workflow.add_loop_condition(cond)

# custom_condition
# iDDS will check to_contiue status.
# With ATLASLocalPanDAWork, the output will be parsed. If 'to_continue' is in the output, it will be used.
work2.add_custom_condition(key='to_continue', value=True)
cond = Condition(work2.get_custom_condition_status)
workflow.add_loop_condition(cond1)

# multiple custom_condition
# to_continue and to_continue1
work2.add_custom_condition(key='to_continue', value=True, op='and')
work2.add_custom_condition(key='to_continue1', value=True, op='and')
cond = Condition(work2.get_custom_condition_status)
workflow.add_loop_condition(cond1)

# multiple custom_condition
# (to_continue and to_continue1) or to_exit or to_exit1
work2.add_custom_condition(key='to_continue', value=True, op='and')
work2.add_custom_condition(key='to_continue1', value=True, op='and')
work2.add_custom_condition(key='to_exit', value=False, op='or')
work2.add_custom_condition(key='to_exit1', value=False, op='or')
cond = Condition(work2.get_custom_condition_status)
workflow.add_loop_condition(cond1)

Sub workflow

Here is a simple example of sub workflow.

from idds.workflowv2.work import Work, WorkStatus
from idds.workflowv2.workflow import (CompositeCondition, AndCondition, OrCondition,
                                      Condition, ConditionTrigger, Workflow, ParameterLink)

work1 = Work(executable='/bin/hostname', arguments=None, sandbox=None, work_id=1)
work2 = Work(executable='/bin/hostname', arguments=None, sandbox=None, work_id=2)

workflow1 = Workflow()
workflow1.add_work(work1, initial=False)
workflow1.add_work(work2, initial=False)

work3 = Work(executable='/bin/hostname', arguments=None, sandbox=None, work_id=3)

workflow = Workflow()
workflow.add_work(work3, initial=False)
workflow.add_work(workflow1, initial=False)

Workflow with global parameters

Here is a simple example of workflow with global parameters. When a work starts, the work will use the global parameters and call ‘setattr’ to set the attributes for this work. When a work terminates, idds will call getattr to get the values for global parameters and store them in the global parameters. However, to avoid the global parameters overwrite the work’s private attributes, currently only parameters start with ‘user_’ will be accepted as global parameters. Other parameters will be ignored with a warning logging messages.

from idds.workflowv2.work import Work, WorkStatus
from idds.workflowv2.workflow import (CompositeCondition, AndCondition, OrCondition,
                                      Condition, ConditionTrigger, Workflow, ParameterLink)

work1 = Work(executable='/bin/hostname', arguments=None, sandbox=None, work_id=1,
             primary_input_collection={'scope': 'test_scop', 'name': 'input_test_work_1'},
             primary_output_collection={'scope': 'test_scop', 'name': 'output_test_work_1'})
work2 = Work(executable='/bin/hostname', arguments=None, sandbox=None, work_id=2,
             primary_input_collection={'scope': 'test_scop', 'name': 'input_test_work_2'},
             primary_output_collection={'scope': 'test_scop', 'name': 'output_test_work_2'})

workflow1 = Workflow()
workflow1.add_work(work1, initial=False)
workflow1.add_work(work2, initial=False)

# global paraeters
workflow1.set_global_parameters({'user_attr1': 1, 'user_attr2': 2})

# sliced global parameters
workflow1.set_global_parameters({'user_attr': [1, 2, 3]})
workflow1.set_sliced_global_parameters(source='user_attr', index=0)
workflow1.set_sliced_global_parameters(source='user_attr', index=1, name='user_myattr')