From 14d3d703f4ac195efc26944acf8faec96861456a Mon Sep 17 00:00:00 2001 From: Qian Yingjin Date: Thu, 6 Aug 2020 16:29:21 +0800 Subject: [PATCH] LU-13881 pcc: comparator support for PCC rules There are increasing requirements for PCC rules to add comparator support: - File data larger or smaller than certain threshold should not auto cache in PCC (i.e. larger than the capacity of PCC backend on a client). - Users can specify a range of UID/GID/ProjID for auto caching on PCC when define a rule; In addition to the original equal (=) operator, this patch also adds greater than (>) and less than (<) comparison operators. The following rule expressions are supported: - "projid={100}&size>{1M}&size<{500G}" - "projid>{100}&projid<{110}" - "uid<{1500}&uid>{1000}" Signed-off-by: Qian Yingjin Change-Id: I9f024eb6903f5652ba3cf04fa289456803493b2c Reviewed-on: https://review.whamcloud.com/41920 Tested-by: jenkins Reviewed-by: Li Xi Tested-by: Maloo Reviewed-by: Andreas Dilger --- lustre/doc/lctl-pcc.8 | 9 ++ lustre/llite/namei.c | 1 + lustre/llite/pcc.c | 271 ++++++++++++++++++++++++++++++++------- lustre/llite/pcc.h | 25 +++- lustre/obdclass/lprocfs_status.c | 4 +- lustre/tests/sanity-pcc.sh | 107 +++++++++++++++- 6 files changed, 366 insertions(+), 51 deletions(-) diff --git a/lustre/doc/lctl-pcc.8 b/lustre/doc/lctl-pcc.8 index 50045bb..c1950e8 100644 --- a/lustre/doc/lctl-pcc.8 +++ b/lustre/doc/lctl-pcc.8 @@ -27,6 +27,15 @@ example rule means that new files are only auto cached if the project ID is 500 and the suffix of the file name is "h5". "rwid" represents the read-write attach id (2) which value is same as the archive ID of the copytool agent running on this PCC node. +In addition to the equal (=) operator, it also supports greater than (>) and +less than (<) comparison operators. +The following rule expressions are supported: +- "projid={100}&size>{1M}&size<{500G}" +- "projid>{100}&projid<{110}" +- "uid<{1500}&uid>{1000}" +Currently each PCC backend only has one rule which is configed when setup PCC +backend on a client. If a user wants to change the rule, the PCC backend needs +to be removed first and then added back with a new rule. .TP .B lctl pcc del [\fB--keep\fR|\fB-k\fR] <\fImntpath\fR> <\fIpccpath\fR> Delete a PCC backend specified by path diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index c5d951e..b6d4ae6 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -1112,6 +1112,7 @@ static int ll_atomic_open(struct inode *dir, struct dentry *dentry, item.pm_gid = from_kgid(&init_user_ns, current_gid()); item.pm_projid = ll_i2info(dir)->lli_projid; item.pm_name = &dentry->d_name; + item.pm_size = 0; dataset = pcc_dataset_match_get(&sbi->ll_pcc_super, LU_PCC_READWRITE, &item); diff --git a/lustre/llite/pcc.c b/lustre/llite/pcc.c index 7f2c03f..4f0f6c0 100644 --- a/lustre/llite/pcc.c +++ b/lustre/llite/pcc.c @@ -130,27 +130,42 @@ int pcc_super_init(struct pcc_super *super) } /* Rule based auto caching */ -static void pcc_id_list_free(struct list_head *id_list) +static void pcc_id_list_free(struct pcc_expression *expr) { struct pcc_match_id *id, *n; - list_for_each_entry_safe(id, n, id_list, pmi_linkage) { - list_del_init(&id->pmi_linkage); - OBD_FREE_PTR(id); + if (expr->pe_opc == PCC_FIELD_OP_EQ) { + list_for_each_entry_safe(id, n, &expr->pe_cond, pmi_linkage) { + list_del_init(&id->pmi_linkage); + OBD_FREE_PTR(id); + } } } -static void pcc_fname_list_free(struct list_head *fname_list) +static void pcc_fname_list_free(struct pcc_expression *expr) { struct pcc_match_fname *fname, *n; - list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) { + LASSERT(expr->pe_opc == PCC_FIELD_OP_EQ); + list_for_each_entry_safe(fname, n, &expr->pe_cond, pmf_linkage) { OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1); list_del_init(&fname->pmf_linkage); OBD_FREE_PTR(fname); } } +static void pcc_size_list_free(struct pcc_expression *expr) +{ + struct pcc_match_size *sz, *n; + + if (expr->pe_opc == PCC_FIELD_OP_EQ) { + list_for_each_entry_safe(sz, n, &expr->pe_cond, pms_linkage) { + list_del_init(&sz->pms_linkage); + OBD_FREE_PTR(sz); + } + } +} + static void pcc_expression_free(struct pcc_expression *expr) { LASSERT(expr->pe_field >= PCC_FIELD_UID && @@ -159,10 +174,13 @@ static void pcc_expression_free(struct pcc_expression *expr) case PCC_FIELD_UID: case PCC_FIELD_GID: case PCC_FIELD_PROJID: - pcc_id_list_free(&expr->pe_cond); + pcc_id_list_free(expr); break; case PCC_FIELD_FNAME: - pcc_fname_list_free(&expr->pe_cond); + pcc_fname_list_free(expr); + break; + case PCC_FIELD_SIZE: + pcc_size_list_free(expr); break; default: LBUG(); @@ -207,7 +225,9 @@ static void pcc_cmd_fini(struct pcc_cmd *cmd) #define PCC_DISJUNCTION_DELIM (',') #define PCC_CONJUNCTION_DELIM ('&') -#define PCC_EXPRESSION_DELIM ('=') +#define PCC_EXPRESSION_DELIM_EQ ('=') +#define PCC_EXPRESSION_DELIM_LT ('<') +#define PCC_EXPRESSION_DELIM_GT ('>') static int pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list) @@ -230,7 +250,7 @@ pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list) } static int -pcc_fname_list_parse(char *str, int len, struct list_head *fname_list) +pcc_fname_list_parse(char *str, int len, struct pcc_expression *expr) { struct cfs_lstr src; struct cfs_lstr res; @@ -238,27 +258,29 @@ pcc_fname_list_parse(char *str, int len, struct list_head *fname_list) ENTRY; + if (expr->pe_opc != PCC_FIELD_OP_EQ) + RETURN(-EINVAL); + src.ls_str = str; src.ls_len = len; - INIT_LIST_HEAD(fname_list); + INIT_LIST_HEAD(&expr->pe_cond); while (src.ls_str) { rc = cfs_gettok(&src, ' ', &res); if (rc == 0) { rc = -EINVAL; break; } - rc = pcc_fname_list_add(&res, fname_list); + rc = pcc_fname_list_add(&res, &expr->pe_cond); if (rc) break; } if (rc) - pcc_fname_list_free(fname_list); + pcc_fname_list_free(expr); RETURN(rc); } static int -pcc_id_list_parse(char *str, int len, struct list_head *id_list, - enum pcc_field type) +pcc_id_list_parse(char *str, int len, struct pcc_expression *expr) { struct cfs_lstr src; struct cfs_lstr res; @@ -266,13 +288,19 @@ pcc_id_list_parse(char *str, int len, struct list_head *id_list, ENTRY; - if (type != PCC_FIELD_UID && type != PCC_FIELD_GID && - type != PCC_FIELD_PROJID) + if (expr->pe_field != PCC_FIELD_UID && + expr->pe_field != PCC_FIELD_GID && + expr->pe_field != PCC_FIELD_PROJID) + RETURN(-EINVAL); + + if (expr->pe_opc >= PCC_FIELD_OP_MAX) RETURN(-EINVAL); src.ls_str = str; src.ls_len = len; - INIT_LIST_HEAD(id_list); + + INIT_LIST_HEAD(&expr->pe_cond); + while (src.ls_str) { struct pcc_match_id *id; __u32 id_val; @@ -289,14 +317,80 @@ pcc_id_list_parse(char *str, int len, struct list_head *id_list, GOTO(out, rc = -ENOMEM); id->pmi_id = id_val; - list_add_tail(&id->pmi_linkage, id_list); + list_add_tail(&id->pmi_linkage, &expr->pe_cond); } out: if (rc) - pcc_id_list_free(id_list); + pcc_id_list_free(expr); RETURN(rc); } +static int +pcc_expr_id_parse(char *str, int len, struct pcc_expression *expr) +{ + int rc = 0; + + if (expr->pe_opc == PCC_FIELD_OP_EQ) + rc = pcc_id_list_parse(str, len, expr); + else if (!cfs_str2num_check(str, len, &expr->pe_id, 0, (u32)~0U)) + rc = -EINVAL; + + return rc; +} + +static int +pcc_size_list_parse(char *str, int len, struct pcc_expression *expr) +{ + struct cfs_lstr src; + struct cfs_lstr res; + int rc = 0; + + ENTRY; + + if (expr->pe_field != PCC_FIELD_SIZE) + RETURN(-EINVAL); + + if (expr->pe_opc >= PCC_FIELD_OP_MAX) + RETURN(-EINVAL); + + src.ls_str = str; + src.ls_len = len; + + INIT_LIST_HEAD(&expr->pe_cond); + + while (src.ls_str) { + struct pcc_match_size *sz; + __u64 sz_val; + + if (cfs_gettok(&src, ' ', &res) == 0) + GOTO(out, rc = -EINVAL); + + rc = sysfs_memparse(res.ls_str, res.ls_len, &sz_val, "MiB"); + if (rc < 0) + GOTO(out, rc = rc); + + OBD_ALLOC_PTR(sz); + if (sz == NULL) + GOTO(out, rc = -ENOMEM); + + sz->pms_size = sz_val; + list_add_tail(&sz->pms_linkage, &expr->pe_cond); + } +out: + if (rc) + pcc_id_list_free(expr); + RETURN(rc); +} + +static int +pcc_expr_size_parse(char *str, int len, struct pcc_expression *expr) +{ + if (expr->pe_opc == PCC_FIELD_OP_EQ) + return pcc_size_list_parse(str, len, expr); + else + return sysfs_memparse(str, len, &expr->pe_size, "MiB"); +} + static inline bool pcc_check_field(struct cfs_lstr *field, char *str) { @@ -306,53 +400,91 @@ pcc_check_field(struct cfs_lstr *field, char *str) strncmp(field->ls_str, str, len) == 0); } +static inline char +pcc_get_opcode_delim(enum pcc_field_op opc) +{ + switch (opc) { + case PCC_FIELD_OP_EQ: + return PCC_EXPRESSION_DELIM_EQ; + case PCC_FIELD_OP_LT: + return PCC_EXPRESSION_DELIM_LT; + case PCC_FIELD_OP_GT: + return PCC_EXPRESSION_DELIM_GT; + default: + LBUG(); + } +} + +static enum pcc_field_op +pcc_get_field_opcode(struct cfs_lstr *src, struct cfs_lstr *field) +{ + struct cfs_lstr tmp; + int rc; + int i; + + ENTRY; + + for (i = PCC_FIELD_OP_EQ; i < PCC_FIELD_OP_MAX; i++) { + tmp = *src; + rc = cfs_gettok(&tmp, pcc_get_opcode_delim(i), field); + if (rc > 0 && tmp.ls_str != NULL) { + src->ls_str = tmp.ls_str; + src->ls_len = tmp.ls_len; + RETURN(i); + } + } + + RETURN(PCC_FIELD_OP_INV); +} + static int pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list) { struct pcc_expression *expr; + enum pcc_field_op opc; struct cfs_lstr field; - int rc = 0; + int rc; + + opc = pcc_get_field_opcode(src, &field); + if (opc == PCC_FIELD_OP_INV) + return -EINVAL; + + if (src->ls_len <= 2 || src->ls_str[0] != '{' || + src->ls_str[src->ls_len - 1] != '}') + return -EINVAL; OBD_ALLOC_PTR(expr); if (expr == NULL) return -ENOMEM; - rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field); - if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' || - src->ls_str[src->ls_len - 1] != '}') - GOTO(out, rc = -EINVAL); - /* Skip '{' and '}' */ src->ls_str++; src->ls_len -= 2; + expr->pe_opc = opc; if (pcc_check_field(&field, "uid")) { - if (pcc_id_list_parse(src->ls_str, - src->ls_len, - &expr->pe_cond, - PCC_FIELD_UID) < 0) - GOTO(out, rc = -EINVAL); expr->pe_field = PCC_FIELD_UID; - } else if (pcc_check_field(&field, "gid")) { - if (pcc_id_list_parse(src->ls_str, - src->ls_len, - &expr->pe_cond, - PCC_FIELD_GID) < 0) + if (pcc_expr_id_parse(src->ls_str, src->ls_len, expr) < 0) GOTO(out, rc = -EINVAL); + } else if (pcc_check_field(&field, "gid")) { expr->pe_field = PCC_FIELD_GID; - } else if (pcc_check_field(&field, "projid")) { - if (pcc_id_list_parse(src->ls_str, - src->ls_len, - &expr->pe_cond, - PCC_FIELD_PROJID) < 0) + if (pcc_expr_id_parse(src->ls_str, src->ls_len, expr) < 0) GOTO(out, rc = -EINVAL); + } else if (pcc_check_field(&field, "projid")) { expr->pe_field = PCC_FIELD_PROJID; + if (pcc_expr_id_parse(src->ls_str, src->ls_len, expr) < 0) + GOTO(out, rc = -EINVAL); + } else if (pcc_check_field(&field, "size")) { + expr->pe_field = PCC_FIELD_SIZE; + if (pcc_expr_size_parse(src->ls_str, src->ls_len, expr) < 0) + GOTO(out, rc = -EINVAL); } else if (pcc_check_field(&field, "fname")) { - if (pcc_fname_list_parse(src->ls_str, - src->ls_len, - &expr->pe_cond) < 0) + if (opc != PCC_FIELD_OP_EQ) GOTO(out, rc = -EINVAL); + expr->pe_field = PCC_FIELD_FNAME; + if (pcc_fname_list_parse(src->ls_str, src->ls_len, expr) < 0) + GOTO(out, rc = -EINVAL); } else { GOTO(out, rc = -EINVAL); } @@ -653,15 +785,59 @@ pcc_fname_list_match(struct list_head *fname_list, const char *name) } static int +pcc_expr_id_match(struct pcc_expression *expr, __u32 id) +{ + switch (expr->pe_opc) { + case PCC_FIELD_OP_EQ: + return pcc_id_list_match(&expr->pe_cond, id); + case PCC_FIELD_OP_LT: + return id < expr->pe_id; + case PCC_FIELD_OP_GT: + return id > expr->pe_id; + default: + return 0; + } +} + +static int +pcc_size_list_match(struct list_head *id_list, __u64 sz_val) +{ + struct pcc_match_size *sz; + + list_for_each_entry(sz, id_list, pms_linkage) { + if (sz->pms_size == sz_val) + return 1; + } + return 0; +} + +static int +pcc_expr_size_match(struct pcc_expression *expr, __u64 sz) +{ + switch (expr->pe_opc) { + case PCC_FIELD_OP_EQ: + return pcc_size_list_match(&expr->pe_cond, sz); + case PCC_FIELD_OP_LT: + return sz < expr->pe_size; + case PCC_FIELD_OP_GT: + return sz > expr->pe_size; + default: + return 0; + } +} + +static int pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher) { switch (expr->pe_field) { case PCC_FIELD_UID: - return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid); + return pcc_expr_id_match(expr, matcher->pm_uid); case PCC_FIELD_GID: - return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid); + return pcc_expr_id_match(expr, matcher->pm_gid); case PCC_FIELD_PROJID: - return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid); + return pcc_expr_id_match(expr, matcher->pm_projid); + case PCC_FIELD_SIZE: + return pcc_expr_size_match(expr, matcher->pm_size); case PCC_FIELD_FNAME: return pcc_fname_list_match(&expr->pe_cond, matcher->pm_name->name); @@ -1402,6 +1578,7 @@ static int pcc_try_readonly_open_attach(struct inode *inode, struct file *file, item.pm_gid = from_kgid(&init_user_ns, current_gid()); item.pm_projid = ll_i2info(inode)->lli_projid; item.pm_name = &dentry->d_name; + item.pm_size = ll_i2info(inode)->lli_lazysize; dataset = pcc_dataset_match_get(&ll_i2sbi(inode)->ll_pcc_super, LU_PCC_READONLY, &item); if (dataset == NULL) diff --git a/lustre/llite/pcc.h b/lustre/llite/pcc.h index 842a27f..58d7f58 100644 --- a/lustre/llite/pcc.h +++ b/lustre/llite/pcc.h @@ -48,6 +48,12 @@ struct pcc_match_id { struct list_head pmi_linkage; }; +/* Lazy file size */ +struct pcc_match_size { + __u64 pms_size; + struct list_head pms_linkage; +}; + /* wildcard file name */ struct pcc_match_fname { char *pmf_name; @@ -59,13 +65,27 @@ enum pcc_field { PCC_FIELD_GID, PCC_FIELD_PROJID, PCC_FIELD_FNAME, + PCC_FIELD_SIZE, PCC_FIELD_MAX }; +enum pcc_field_op { + PCC_FIELD_OP_EQ = 0, + PCC_FIELD_OP_LT = 1, + PCC_FIELD_OP_GT = 2, + PCC_FIELD_OP_MAX = 3, + PCC_FIELD_OP_INV = PCC_FIELD_MAX, +}; + struct pcc_expression { - enum pcc_field pe_field; - struct list_head pe_cond; struct list_head pe_linkage; + enum pcc_field pe_field; + enum pcc_field_op pe_opc; + union { + struct list_head pe_cond; + __u64 pe_size; + __u32 pe_id; + }; }; struct pcc_conjunction { @@ -87,6 +107,7 @@ struct pcc_matcher { __u32 pm_uid; __u32 pm_gid; __u32 pm_projid; + __u64 pm_size; struct qstr *pm_name; }; diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 89403da..8d63653 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -1758,7 +1758,9 @@ int sysfs_memparse(const char *buffer, size_t count, u64 *val, char tmp_buf[23]; int rc; - count = strlen(buffer); + if (count > strlen(buffer)) + count = strlen(buffer); + while (count > 0 && isspace(buffer[count - 1])) count--; diff --git a/lustre/tests/sanity-pcc.sh b/lustre/tests/sanity-pcc.sh index 6ca3470..f0f7408 100644 --- a/lustre/tests/sanity-pcc.sh +++ b/lustre/tests/sanity-pcc.sh @@ -178,7 +178,8 @@ setup_pcc_mapping() { [ -z "$param" ] && param="projid={100}\ rwid=$HSM_ARCHIVE_NUMBER" stack_trap "cleanup_pcc_mapping $facet" EXIT - do_facet $facet $LCTL pcc add $MOUNT $hsm_root -p $param + do_facet $facet $LCTL pcc add $MOUNT $hsm_root -p "$param" || + error "Setup PCC backend $hsm_root on $MOUNT failed" } umount_loopdev() { @@ -2682,6 +2683,110 @@ test_32() { } run_test 32 "Test for RO-PCC when PCC copy is deleted" +test_33() { + local loopfile="$TMP/$tfile" + local mntpt="/mnt/pcc.$tdir" + local hsm_root="$mntpt/$tdir" + local file=$DIR/myfile.doc + local file2=$DIR2/myfile.doc + + setup_loopdev $SINGLEAGT $loopfile $mntpt 50 + copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER" + + setup_pcc_mapping $SINGLEAGT \ + "fname={*.doc}\&size\<{1M}\ roid=$HSM_ARCHIVE_NUMBER\ ropcc=1" + do_facet $SINGLEAGT $LCTL pcc list $MOUNT + touch $file || error "touch $file failed" + $TRUNCATE $file $((1048576 * 2)) || error "Truncate $file failed" + check_lpcc_state $file "none" + do_facet $SINGLEAGT $LFS pcc state $file + $TRUNCATE $file $((1048576 / 2)) || error "Truncate $file failed" + do_facet $SINGLEAGT $LFS pcc state $file + check_lpcc_state $file "readonly" + cleanup_pcc_mapping + + setup_pcc_mapping $SINGLEAGT \ + "fname={*.doc}\&size\<{5M}\&size\>{3M}\ roid=5\ ropcc=1" + do_facet $SINGLEAGT $LCTL pcc list $MOUNT + check_lpcc_state $file "none" + $TRUNCATE $file2 $((1048576 * 6)) || error "Truncate $file2 failed" + check_lpcc_state $file "none" + $TRUNCATE $file2 $((1048576 * 4)) || error "Truncate $file2 failed" + check_lpcc_state $file "readonly" + cleanup_pcc_mapping + + setup_pcc_mapping $SINGLEAGT \ + "fname={*.doc}\&size={5M\ 3M}\ roid=5\ ropcc=1" + do_facet $SINGLEAGT $LCTL pcc list $MOUNT + check_lpcc_state $file "none" + $TRUNCATE $file $((1048576 * 5)) || error "Truncate $file failed" + check_lpcc_state $file "readonly" + do_facet $SINGLEAGT $LFS pcc detach $file || + error "failed to detach $file" + $TRUNCATE $file $((1048576 * 4)) || error "Truncate $file failed" + check_lpcc_state $file "none" + $TRUNCATE $file $((1048576 * 3)) || error "Truncate $file failed" + check_lpcc_state $file "readonly" + cleanup_pcc_mapping +} +run_test 33 "Cache rule with comparator (>, =, <) for file size" + +test_34() { + local loopfile="$TMP/$tfile" + local mntpt="/mnt/pcc.$tdir" + local hsm_root="$mntpt/$tdir" + local file=$DIR/$tfile + + ! is_project_quota_supported && + skip "project quota is not supported" + + enable_project_quota + setup_loopdev $SINGLEAGT $loopfile $mntpt 50 + copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER" + + setup_pcc_mapping $SINGLEAGT \ + "projid\>{100}\ roid=5\ ropcc=1" + do_facet $SINGLEAGT $LCTL pcc list $MOUNT + do_facet $SINGLEAGT "echo -n QQQQQ > $file" || + error "failed to write $file" + check_lpcc_state $file "none" + $LFS project -p 99 $file || error "failed to set project for $file" + $LFS project -d $file + check_lpcc_state $file "none" + $LFS project -p 101 $file || error "failed to set project for $file" + $LFS project -d $file + check_lpcc_state $file "readonly" + cleanup_pcc_mapping + + setup_pcc_mapping $SINGLEAGT \ + "projid\<{100}\ roid=5\ ropcc=1" + do_facet $SINGLEAGT $LCTL pcc list $MOUNT + check_lpcc_state $file "none" + $LFS project -p 102 $file || error "failed to set project for $file" + $LFS project -d $file + check_lpcc_state $file "none" + $LFS project -p 99 $file || error "failed to set project for $file" + $LFS project -d $file + check_lpcc_state $file "readonly" + cleanup_pcc_mapping + + setup_pcc_mapping $SINGLEAGT \ + "projid\<{120}\&projid\>{110}\ roid=5\ ropcc=1" + do_facet $SINGLEAGT $LCTL pcc list $MOUNT + check_lpcc_state $file "none" + $LFS project -p 105 $file || error "failed to set project for $file" + $LFS project -d $file + check_lpcc_state $file "none" + $LFS project -p 121 $file || error "failed to set project for $file" + $LFS project -d $file + check_lpcc_state $file "none" + $LFS project -p 115 $file || error "failed to set project for $file" + $LFS project -d $file + check_lpcc_state $file "readonly" + cleanup_pcc_mapping +} +run_test 34 "Cache rule with comparator (>, <) for Project ID range" + complete $SECONDS check_and_cleanup_lustre exit_status -- 1.8.3.1