变量继承#

上节中我们为每个任务都设置的 Slurm 提交变量,实际上我们可以利用 ecFlow 的变量继承机制简化设置。

修改工作流定义#

更新 ${TUTORIAL_HOME}/def 中的工作流定义文件 cma_gfs_post.py

 1import os
 2
 3import ecflow
 4
 5
 6def slurm_serial(class_name, wckey):
 7    variables = {
 8        "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
 9        "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
10            "CLASS": class_name,
11        "WCKEY": wckey,
12    }
13    return variables
14
15
16def slurm_parallel(nodes, tasks_per_node, class_name, wckey):
17    variables = {
18        "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
19        "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
20        "NODES": nodes,
21        "TASKS_PER_NODE": tasks_per_node,
22            "CLASS": class_name,
23        "WCKEY": wckey,
24    }
25    return variables
26
27
28current_path = os.path.dirname(__file__)
29tutorial_base = os.path.abspath(os.path.join(current_path, "../"))
30def_path = os.path.join(tutorial_base, "def")
31ecfout_path = os.path.join(tutorial_base, "ecfout")
32program_base_dir = os.path.join(tutorial_base, "program/cma-gfs-post-program")
33run_base_dir = os.path.join(tutorial_base, "workdir")
34
35defs = ecflow.Defs()
36
37with defs.add_suite("cma_gfs_post") as suite:
38    suite.add_variable("PROGRAM_BASE_DIR", program_base_dir)
39    suite.add_variable("RUN_BASE_DIR", run_base_dir)
40
41    suite.add_variable("ECF_INCLUDE", os.path.join(def_path, "include"))
42    suite.add_variable("ECF_FILES", os.path.join(def_path, "ecffiles"))
43
44    suite.add_variable("ECF_DATE", "20230806")
45    suite.add_variable("HH", "00")
46
47    suite.add_limit("total_tasks", 10)
48    suite.add_inlimit("total_tasks")
49
50    suite.add_variable(slurm_serial("serial", "105-09"))
51
52    forecast_hour_list = [ f"{hour:03}" for hour in range(0, 241, 3)]
53
54    for forecast_hour in forecast_hour_list:
55        with suite.add_family(forecast_hour) as fm_hour:
56            fm_hour.add_variable("FHOUR", forecast_hour)
57
58            with fm_hour.add_task("pre_data2grib2") as tk_pre_data2grib2:
59                pass
60
61            with fm_hour.add_task("data2grib2") as tk_data2grib2:
62                tk_data2grib2.add_variable(slurm_parallel(4, 64, "normal", "105-09"))
63                tk_data2grib2.add_trigger("./pre_data2grib2 == complete")
64
65print(defs)
66def_output_path = str(os.path.join(def_path, "cma_gfs_post.def"))
67defs.save_as_defs(def_output_path)
  • 50 行:为整个 suite 添加串行作业提交需要的变量

  • 58-59 行:pre_data2grib2 任务无需再设置 slurm 提交变量

更新工作流#

更新 ecFlow 上的工作流:

cd ${TUTORIAL_HOME}/def/ecffiles
python3 cma_gfs_post.py
ecflow_client --port 43083 --replace /cma_gfs_post cma_gfs_post.def

从截图中可以看到 suite 节点 cma_gfs_post 的 Slurm 提交相关变量:

../_images/ecflow-ui-variable.png