Source code for abiflows.fireworks.tasks.flow_wrapper_tasks
# coding: utf-8
from fireworks import FireTaskBase, FWAction, Firework, explicit_serialize
from abipy.abilab import Flow
[docs]@explicit_serialize
class FireTaskWithFlow(FireTaskBase):
required_params = ["flow"]
#def __init__(self, *args, **kwargs):
# #print("args", args, "kwargs", kwargs)
# #self._flow = kwargs["flow"]
# super().__init__(*args, **kwargs)
[docs] def run_task(self, fw_spec):
# print("entering run_task: %s " % str(self))
# Initialize the Flow from the filepath.
# Most of the serious stuff is delegated to flow.
# Note that FireTask converts all the objects to strings
# hence self["flow"] is the workdir of the flow from which
# we can reconstruct our object with pickle.
print("In MyTask with flow %s" % self["flow"])
workdir = self["flow"]["workdir"]
flow = Flow.pickle_load(workdir)
flow.build_and_pickle_dump()
try:
all_ok = fw_spec["all_ok"]
except KeyError:
all_ok = False
if all_ok:
print("all_ok will stop")
return FWAction()
else:
print("launched %s" % flow.rapidfire(check_status=True))
all_ok = flow.all_ok
new_fw = Firework(FireTaskWithFlow(flow=flow), {'all_ok': all_ok})
return FWAction(stored_data={'all_ok': all_ok}, additions=new_fw)