添加日期循环#
现在我们有了一个相对完整的 CMA-GFS 后处理 ecFlow 流程,能够处理某天 00 和 12 两个时次的模式输出。
日期通过 cma_gfs_post 节点的变量 (ECF_DATE) 指定。
当我们想要运行其他日期的后处理流程时,可以手动将 ECF_DATE 变量设置为其他时间。
业务系统不可能每天都要求运维人员手动调整参数,人为操作越多就容易引发故障。
本节将介绍本教程中最后一个概念,会将仅用于单次模式试验的流程改造为可以滚动运行的业务系统流程。
在本节中,我们将为 CMA-GFS 后处理工作流添加日期循环 (RepeatDate)。
在日期循环和时间依赖的帮助下,工作流将实现每天自动运行。
更新工作流定义#
更新 ${TUTORIAL_HOME}/def 中的工作流定义文件 cma_gfs_post.py:
1import os
2
3import ecflow
4
5
6def slurm_serial(class_name, wckey):
7 variables = {
8 "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
9 "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
10 "CLASS": class_name,
11 "WCKEY": wckey,
12 }
13 return variables
14
15
16def slurm_parallel(nodes, tasks_per_node, class_name, wckey):
17 variables = {
18 "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
19 "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
20 "NODES": nodes,
21 "TASKS_PER_NODE": tasks_per_node,
22 "CLASS": class_name,
23 "WCKEY": wckey,
24 }
25 return variables
26
27
28current_path = os.path.dirname(__file__)
29tutorial_base = os.path.abspath(os.path.join(current_path, "../"))
30def_path = os.path.join(tutorial_base, "def")
31ecfout_path = os.path.join(tutorial_base, "ecfout")
32program_base_dir = os.path.join(tutorial_base, "program/cma-gfs-post-program")
33run_base_dir = os.path.join(tutorial_base, "workdir")
34
35defs = ecflow.Defs()
36
37with defs.add_suite("cma_gfs_post") as suite:
38 suite.add_variable("PROGRAM_BASE_DIR", program_base_dir)
39 suite.add_variable("RUN_BASE_DIR", run_base_dir)
40
41 suite.add_variable("ECF_INCLUDE", os.path.join(def_path, "include"))
42 suite.add_variable("ECF_FILES", os.path.join(def_path, "ecffiles"))
43
44 suite.add_variable("ECF_DATE", "20230806")
45
46 suite.add_limit("total_tasks", 10)
47 suite.add_inlimit("total_tasks")
48
49 suite.add_variable(slurm_serial("serial", "105-09"))
50
51 suite.add_repeat(ecflow.RepeatDate("ECF_DATE", 20230806, 20240101))
52
53 forecast_hour_list = [ f"{hour:03}" for hour in range(0, 241, 3)]
54
55 start_hours = [
56 { "name": "00", "time": "04:40"},
57 { "name": "12", "time": "16:40"},
58 ]
59
60 for start_hour in start_hours:
61 cycle_name = start_hour["name"]
62 cycle_time = start_hour["time"]
63 with suite.add_family(cycle_name) as fm_start_hour:
64 fm_start_hour.add_time(cycle_time)
65 fm_start_hour.add_variable("HH", cycle_name)
66
67 with fm_start_hour.add_task("initial") as tk_initail:
68 for forecast_hour in forecast_hour_list:
69 tk_initail.add_event(f"modvar_{forecast_hour}")
70
71 with fm_start_hour.add_family("data") as fm_data:
72 for forecast_hour in forecast_hour_list:
73 with fm_data.add_family(forecast_hour) as fm_hour:
74 fm_hour.add_variable("FHOUR", forecast_hour)
75 fm_hour.add_trigger(f"../initial:modvar_{forecast_hour}")
76
77 with fm_hour.add_task("pre_data2grib2") as tk_pre_data2grib2:
78 pass
79
80 with fm_hour.add_task("data2grib2") as tk_data2grib2:
81 tk_data2grib2.add_variable(slurm_parallel(4, 64, "normal", "105-09"))
82 tk_data2grib2.add_trigger("./pre_data2grib2 == complete")
83
84print(defs)
85def_output_path = str(os.path.join(def_path, "cma_gfs_post.def"))
86defs.save_as_defs(def_output_path)
新增代码解析:
51 行:增加日期循环,从 2023.08.06 开始到 2024.01.01
更新工作流#
挂起 cma_gfs_post 节点,更新 ecFlow 上的工作流:
cd ${TUTORIAL_HOME}/def
python3 cma_gfs_post.py
ecflow_client --host login_a13 --port 43083 --replace /cma_gfs_post cma_gfs_post.def
双击左侧树形结构中的 ECF_DATE 变量,会弹出修改框,可以选择不同日期: