添加第二个任务#

下面我们开始使用 ecFlow 构建更复杂的工作流。

首先添加第二个任务 data2grib2,运行后处理程序。

本节将介绍如何在 CMA-PI 上的并行节点上运行 ecFlow 任务。 并行任务与之前创建的串行任务类似,只不过要额外设置运行节点数和每个节点使用的 CPU 个数。

修改工作流定义#

更新 ${TUTORIAL_HOME}/def 中的工作流定义文件 cma_gfs_post.py

 1import os
 2
 3import ecflow
 4
 5
 6def slurm_serial(class_name, wckey):
 7    variables = {
 8        "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
 9        "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
10            "CLASS": class_name,
11        "WCKEY": wckey,
12    }
13    return variables
14
15
16def slurm_parallel(nodes, tasks_per_node, class_name, wckey):
17    variables = {
18        "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
19        "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
20        "NODES": nodes,
21        "TASKS_PER_NODE": tasks_per_node,
22            "CLASS": class_name,
23        "WCKEY": wckey,
24    }
25    return variables
26
27
28current_path = os.path.dirname(__file__)
29tutorial_base = os.path.abspath(os.path.join(current_path, "../"))
30def_path = os.path.join(tutorial_base, "def")
31ecfout_path = os.path.join(tutorial_base, "ecfout")
32program_base_dir = os.path.join(tutorial_base, "program/cma-gfs-post-program")
33run_base_dir = os.path.join(tutorial_base, "workdir")
34
35defs = ecflow.Defs()
36
37with defs.add_suite("cma_gfs_post") as suite:
38    suite.add_variable("PROGRAM_BASE_DIR", program_base_dir)
39    suite.add_variable("RUN_BASE_DIR", run_base_dir)
40
41    suite.add_variable("ECF_INCLUDE", os.path.join(def_path, "include"))
42    suite.add_variable("ECF_FILES", os.path.join(def_path, "ecffiles"))
43
44    suite.add_variable("ECF_DATE", "20230806")
45    suite.add_variable("HH", "00")
46
47    with suite.add_task("pre_data2grib2") as tk_pre_data2grib2:
48        tk_pre_data2grib2.add_variable(slurm_serial("serial", "105-09"))
49
50    with suite.add_task("data2grib2") as tk_data2grib2:
51        tk_data2grib2.add_variable(slurm_parallel(4, 64, "normal", "105-09"))
52        tk_data2grib2.add_trigger("./pre_data2grib2 == complete")
53
54print(defs)
55def_output_path = str(os.path.join(def_path, "cma_gfs_post.def"))
56defs.save_as_defs(def_output_path)

新增代码说明:

  • 16-25 行定义 slurm_parallel 函数,定义提交 Slurm 并行作业需要的一些变量。

  • 50-52 行定义一个并行任务 data2grib2,使用并行队列 normal 运行,需要 4 个节点,每个节点占用 32 个 CPU 核心。

挂起 cma_tym 节点,更新 ecFlow 上的工作流:

cd ${TUTORIAL_HOME}/def
python3 cma_gfs_post.py
ecflow_client --port 43083 --replace /cma_gfs_post cma_gfs_post.def

创建头文件#

${TUTORIAL_HOME}/def/include 中创建头文件 slurm_parallel.h

## This is a head file for Slurm parallel job.
#SBATCH -J GRAPES
#SBATCH -p %CLASS%
#SBATCH -N %NODES%
#SBATCH --ntasks-per-node=%TASKS_PER_NODE%
#SBATCH -o %ECF_JOBOUT%
#SBATCH -e %ECF_JOBOUT%.err
#SBATCH --comment=GRAPES
#SBATCH -t 00:60:00
#SBATCH --no-requeue
#SBATCH --wckey=%WCKEY%

创建任务脚本#

${TUTORIAL_HOME}/def/ecffiles 中创建 ecf 脚本 data2grib2.ecf

#!/bin/ksh
%include <slurm_parallel.h>
%include <head.h>
%include <configure.h>

date

#=======================
forecast_hour=024

#=======================
bin_dir=${PROGRAM_BIN_DIR}
condat_dir=${PROGRAM_CON_DIR}

run_dir=${CYCLE_RUN_BASE_DIR}

#------------------------
INIT_TIME=${START_TIME}

INIT_DATE=$(echo $INIT_TIME| cut -c1-8)
HH=$(echo $INIT_TIME| cut -c9-10)

YY=$(echo $INIT_DATE| cut -c3-4)
Y4=$(echo $INIT_DATE| cut -c1-4)
MM=$(echo $INIT_DATE| cut -c5-6)
DD=$(echo $INIT_DATE| cut -c7-8)
YM=$(echo $INIT_DATE| cut -c1-6)

init_time=${INIT_TIME}
#==============================================#
# create run directory
test -d $run_dir || mkdir -p $run_dir
cd $run_dir

test -d ${forecast_hour} || mkdir -p ${forecast_hour}
cd ${forecast_hour}

#--------------------#
#-----------------------------#
# run grapes_post.exe program
module load compiler/intel/2022.3.0
module load mpi/intelmpi/2021.6.0

export OMP_NUM_THREADS=1

ulimit -s unlimited

mpirun ./grapes_post.exe

#---------------------
# test output
module load wgrib2/3.1.1/intel

modelvar_count=$(wgrib2 -s modelvar${init_time}${forecast_hour}.grb2 | wc -l)
postvar_count=$(wgrib2 -s gmf.gra.${init_time}${forecast_hour}.grb2 | wc -l)
#------------------
%include <tail.h>

在 ecFlowUI 上查看运行结果:

../_images/ecflow-ui-run-data2grib2.png

可以看到生成了两个 GRIB2 文件:

  • modelvar2023080600024.grb2

  • gmf.gra.2023080600024.grb2