添加数据文件检测任务#
从本节起,将介绍 ecFlow 的一些进阶功能。 我们将朝着实时运行的业务系统流程更前进一步,我们将为 CMA-GFS 后处理流程添加数据文件检测任务,并通过设置时间依赖和日期循环参数实现一个可以实时滚动运行的流程。
前面的 pre_data2grib 任务默认 CMA-GFS 已生成了对应时次时效的 modelvar 文件。 实际上,实时运行的后处理系统为了保证产品的时效性,不能等全部时效 modelvar 都生成后再运行后处理任务, 而是要在模式积分逐渐输出文件的过程中就开始运行后处理流程,只要有一个时效的文件输出,就运行对应时效的后处理流程。
后处理流程通常运行一个程序顺序检测模式是否生成各个时效的 modelvar 文件,通过事件 (event) 触发后续的流程。
本节将创建数据文件检测任务,为每个时效设置一个事件,用来触发后续的流程。
更新工作流定义#
更新 ${TUTORIAL_HOME}/def 中的工作流定义文件 cma_tym.py:
1import os
2
3import ecflow
4
5
6def slurm_serial(class_name, wckey):
7 variables = {
8 "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
9 "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
10 "CLASS": class_name,
11 "WCKEY": wckey,
12 }
13 return variables
14
15
16def slurm_parallel(nodes, tasks_per_node, class_name, wckey):
17 variables = {
18 "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
19 "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
20 "NODES": nodes,
21 "TASKS_PER_NODE": tasks_per_node,
22 "CLASS": class_name,
23 "WCKEY": wckey,
24 }
25 return variables
26
27
28current_path = os.path.dirname(__file__)
29tutorial_base = os.path.abspath(os.path.join(current_path, "../"))
30def_path = os.path.join(tutorial_base, "def")
31ecfout_path = os.path.join(tutorial_base, "ecfout")
32program_base_dir = os.path.join(tutorial_base, "program/cma-gfs-post-program")
33run_base_dir = os.path.join(tutorial_base, "workdir")
34
35defs = ecflow.Defs()
36
37with defs.add_suite("cma_gfs_post") as suite:
38 suite.add_variable("PROGRAM_BASE_DIR", program_base_dir)
39 suite.add_variable("RUN_BASE_DIR", run_base_dir)
40
41 suite.add_variable("ECF_INCLUDE", os.path.join(def_path, "include"))
42 suite.add_variable("ECF_FILES", os.path.join(def_path, "ecffiles"))
43
44 suite.add_variable("ECF_DATE", "20230806")
45 suite.add_variable("HH", "00")
46
47 suite.add_limit("total_tasks", 10)
48 suite.add_inlimit("total_tasks")
49
50 suite.add_variable(slurm_serial("serial", "105-09"))
51
52 forecast_hour_list = [ f"{hour:03}" for hour in range(0, 241, 3)]
53
54 with suite.add_task("initial") as tk_initail:
55 for forecast_hour in forecast_hour_list:
56 tk_initail.add_event(f"modvar_{forecast_hour}")
57
58 for forecast_hour in forecast_hour_list:
59 with suite.add_family(forecast_hour) as fm_hour:
60 fm_hour.add_variable("FHOUR", forecast_hour)
61 fm_hour.add_trigger(f"./initial:modvar_{forecast_hour}")
62
63 with fm_hour.add_task("pre_data2grib2") as tk_pre_data2grib2:
64 pass
65
66 with fm_hour.add_task("data2grib2") as tk_data2grib2:
67 tk_data2grib2.add_variable(slurm_parallel(4, 64, "normal", "105-09"))
68 tk_data2grib2.add_trigger("./pre_data2grib2 == complete")
69
70print(defs)
71def_output_path = str(os.path.join(def_path, "cma_gfs_post.def"))
72defs.save_as_defs(def_output_path)
新增代码解析:
54-56 行:添加 initial 任务,设置 000 到 240 的事件
61 行:为时效节点增加触发器,等待 initial 相应事件设置后才运行
更新工作流#
挂起 cma_gfs_post 节点,更新 ecFlow 上的工作流:
cd ${TUTORIAL_HOME}/def
python3 cma_gfs_post.py
ecflow_client --host login_a13 --port 43083 --replace /cma_gfs_post cma_gfs_post.def
创建 initial 任务脚本#
在 ${TUTORIAL_HOME}/def/ecffiles 中创建 ecf 脚本 initial.ecf:
1#!/bin/ksh
2%include <slurm_serial.h>
3%include <head.h>
4%include <configure.h>
5
6date
7
8#=======================
9
10max_check_count=1200
11sleep_seconds_for_check=1
12sleep_seconds_for_next_time=1
13
14#=======================
15bin_dir=${PROGRAM_BIN_DIR}
16condat_dir=${PROGRAM_CON_DIR}
17
18run_dir=${CYCLE_RUN_BASE_DIR}
19
20#------------------------
21start_time=${START_TIME}
22
23#--------------
24test -d ${run_dir} || mkdir -p ${run_dir}
25cd ${run_dir}
26
27#------------------
28
29check_file_without_size_change()
30{
31 start_time=$1
32 f_time=$2
33 file_path=$3
34
35 count=0
36 get_data=0
37 while [[ ${count} -lt ${max_check_count} ]] && [ ${get_data} -ne 1 ]
38 do
39 echo "check...${start_time} ${f_time}...${count}/${max_check_count}"
40 file_path=/g2/op_gfs/OPER_ARCH_TEST/GRAPES_GFS_GMF/Fcst-long/${start_time}/modelvar${start_time}_${f_time}
41
42 if [ -e ${file_path} ]; then
43 echo "check...${file_path}...${count}/${max_check_count}: check size change"
44 last_file_size=-1
45 while [[ ${count} -lt ${max_check_count} ]] && [[ $get_data -eq 0 ]]
46 do
47 echo "check...${file_path}...${count}/${max_check_count}: check size chagne"
48 current_file_size=$(stat -c %%s ${file_path})
49 if [[ ${last_file_size} -eq ${current_file_size} ]]; then
50 get_data=1
51 echo "check...${file_path}...${count}/${max_check_count}: check size chagne success"
52 else
53 last_file_size=${current_file_size}
54 sleep ${sleep_seconds_for_check}
55 fi
56 count=$(($count+1))
57 done
58 else
59 sleep ${sleep_seconds_for_check}
60 fi
61 count=$(($count+1))
62 done
63 if [[ $get_data -eq 0 ]];then
64 echo "check...${start_time} ${f_time}...failed (too many times)"
65 return 1;
66 fi
67 echo "check...${file_path}...found"
68 return 0;
69}
70
71#===================================================
72# check modelvar file one by one, and set event.
73forecast_array="$(seq -f %%03g 0 3 240)"
74
75for ftime in ${forecast_array}; do
76 echo "checking for ${ftime}..."
77 file_path="NOTFOUND"
78 if ! check_file_without_size_change \
79 ${start_time} \
80 ${ftime} \
81 ${file_path}; then
82 we_got_an_error
83 fi
84 sleep ${sleep_seconds_for_next_time}
85 echo "file path: ${file_path}"
86 ecflow_client --event modvar_${ftime}
87done
88
89#------------------
90%include <tail.h>
说明:
10-12 行:设置每次检查的间隔时间(
sleep_seconds_for_check),检查次数上限(max_check_count)29-69 行:
check_file_without_size_change函数检测文件是否存在,如果存在检测文件大小是否有变化,如果无变化则返回73-87 行:逐时效检测文件是否存在,如果存在,则设置相应的事件
modvar_${ftime}
运行任务#
恢复挂起的 cma_gfs_post 节点,观察任务的启动顺序。