From: Fan Yong Date: Wed, 27 Aug 2014 15:12:44 +0000 (+0800) Subject: LU-5517 lfsck: repair invalid nlink count X-Git-Tag: 2.6.90~61 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=9ff2d957982160103b5d885c9a532ad45bdf8d4d LU-5517 lfsck: repair invalid nlink count If the namespace LFSCK has verified all the known name entries during the first-stage scanning, then the MDT-object's linkEA is trustable. So if the non-directory MDT-object's nlink attribute does not match the MDT-object linkEA entries count, then the LFSCK will repair the MDT-object's nlink attribute according to its linkEA entries count. One exception is that: the linkEA space is limited, if there are too much hard links on the MDT-object and exceeds the object's linkEA space limitation, then some name entries cannot be recorded in the linkEA. Under such case, we will add some flags (LLF_SKIP_NLINK) in the LFSCK tracing file for related MDT-objects. Then the LFSCK can skip the nlink attribute verification for the marked MDT-objects during the second-stage scanning. This patch also cleanup the LFSCK environment when current LFSCK scanning exits (completed/stopped/failed) to avoid some stale to misguide the next LFSCK scanning. This patch also makes some code adjustment for the former landed LFSCK patches according to the inspection feedback. Signed-off-by: Fan Yong Change-Id: Iedc676e8cc06a52f55e82372e6dc8b30008e20f4 Reviewed-on: http://review.whamcloud.com/11516 Tested-by: Jenkins Reviewed-by: Alex Zhuravlev Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 929a4d4..d9da1a4 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -284,7 +284,8 @@ int tgt_brw_write(struct tgt_session_info *tsi); int tgt_hpreq_handler(struct ptlrpc_request *req); void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *, struct dt_device *, - struct lfsck_request *)); + struct lfsck_request *, + struct thandle *)); void tgt_register_lfsck_query(int (*query)(const struct lu_env *, struct dt_device *, struct lfsck_request *)); diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index b6d0516..0ebe6b1 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -3585,6 +3585,8 @@ enum lfsck_events { LE_CONDITIONAL_DESTROY = 10, LE_PAIRS_VERIFY = 11, LE_CREATE_ORPHAN = 12, + LE_SKIP_NLINK_DECLARE = 13, + LE_SKIP_NLINK = 14, }; enum lfsck_event_flags { diff --git a/lustre/include/lustre_lfsck.h b/lustre/include/lustre_lfsck.h index 635aa21..b3a7e2e 100644 --- a/lustre/include/lustre_lfsck.h +++ b/lustre/include/lustre_lfsck.h @@ -144,7 +144,7 @@ int lfsck_start(const struct lu_env *env, struct dt_device *key, int lfsck_stop(const struct lu_env *env, struct dt_device *key, struct lfsck_stop *stop); int lfsck_in_notify(const struct lu_env *env, struct dt_device *key, - struct lfsck_request *lr); + struct lfsck_request *lr, struct thandle *th); int lfsck_query(const struct lu_env *env, struct dt_device *key, struct lfsck_request *lr); @@ -156,12 +156,13 @@ int lfsck_set_windows(struct dt_device *key, int val); int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type); static inline void lfsck_pack_rfa(struct lfsck_request *lr, - const struct lu_fid *fid) + const struct lu_fid *fid, + __u32 event, __u16 com) { memset(lr, 0, sizeof(*lr)); - lr->lr_event = LE_FID_ACCESSED; - lr->lr_active = LFSCK_TYPE_LAYOUT; lr->lr_fid = *fid; + lr->lr_event = event; + lr->lr_active = com; } #endif /* _LUSTRE_LFSCK_H */ diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index db06866..8d48365 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -533,6 +533,9 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_LFSCK_MUL_REF 0x1622 #define OBD_FAIL_LFSCK_BAD_TYPE 0x1623 #define OBD_FAIL_LFSCK_NO_NAMEENTRY 0x1624 +#define OBD_FAIL_LFSCK_MORE_NLINK 0x1625 +#define OBD_FAIL_LFSCK_LESS_NLINK 0x1626 +#define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627 #define OBD_FAIL_LFSCK_NOTIFY_NET 0x16f0 #define OBD_FAIL_LFSCK_QUERY_NET 0x16f1 diff --git a/lustre/lfsck/lfsck_internal.h b/lustre/lfsck/lfsck_internal.h index b32b147..6fa9ca1 100644 --- a/lustre/lfsck/lfsck_internal.h +++ b/lustre/lfsck/lfsck_internal.h @@ -111,6 +111,7 @@ struct lfsck_bookmark { enum lfsck_namespace_trace_flags { LNTF_CHECK_LINKEA = 0x01, LNTF_CHECK_PARENT = 0x02, + LNTF_SKIP_NLINK = 0x04, LNTF_ALL = 0xff }; @@ -359,7 +360,8 @@ struct lfsck_operations { int (*lfsck_in_notify)(const struct lu_env *env, struct lfsck_component *com, - struct lfsck_request *lr); + struct lfsck_request *lr, + struct thandle *th); int (*lfsck_query)(const struct lu_env *env, struct lfsck_component *com); @@ -1104,11 +1106,15 @@ static inline void lfsck_lad_set_bitmap(const struct lu_env *env, LASSERT(com->lc_lfsck->li_master); LASSERT(bitmap != NULL); - LASSERTF(bitmap->size > index, "invalid index: nbits %d, index %u\n", - bitmap->size, index); - cfs_bitmap_set(bitmap, index); - lad->lad_incomplete = 1; + if (likely(bitmap->size > index)) { + cfs_bitmap_set(bitmap, index); + lad->lad_incomplete = 1; + } else if (com->lc_type == LFSCK_TYPE_NAMESPACE) { + struct lfsck_namespace *ns = com->lc_file_ram; + + ns->ln_flags |= LF_INCOMPLETE; + } } static inline int lfsck_links_read(const struct lu_env *env, diff --git a/lustre/lfsck/lfsck_layout.c b/lustre/lfsck/lfsck_layout.c index 18633e0..81066d2 100644 --- a/lustre/lfsck/lfsck_layout.c +++ b/lustre/lfsck/lfsck_layout.c @@ -811,9 +811,8 @@ static void lfsck_layout_cpu_to_le(struct lfsck_layout *des, * \param[in] env pointer to the thread context * \param[in] com pointer to the lfsck component * - * \retval positive number for data corruption * \retval 0 for success - * \retval negative error number on failure + * \retval negative error number on failure or data corruption */ static int lfsck_layout_load_bitmap(const struct lu_env *env, struct lfsck_component *com) @@ -862,15 +861,8 @@ static int lfsck_layout_load_bitmap(const struct lu_env *env, size = (lo->ll_bitmap_size + 7) >> 3; rc = dt_read(env, obj, lfsck_buf_get(env, bitmap->data, size), &pos); - if (rc == 0) { - RETURN(-ENOENT); - } else if (rc != size) { - CDEBUG(D_LFSCK, "%s: lfsck_layout bitmap size %u != %u\n", - lfsck_lfsck2name(com->lc_lfsck), - (unsigned int)size, rc); - - RETURN(rc); - } + if (rc != size) + RETURN(rc >= 0 ? -EINVAL : rc); if (cfs_bitmap_check_empty(bitmap)) lad->lad_incomplete = 0; @@ -1417,8 +1409,9 @@ static int lfsck_layout_double_scan_result(const struct lu_env *env, lo->ll_time_last_complete = lo->ll_time_last_checkpoint; lo->ll_success_count++; } else if (rc == 0) { - lo->ll_status = lfsck->li_status; - if (lo->ll_status == 0) + if (lfsck->li_status != 0) + lo->ll_status = lfsck->li_status; + else lo->ll_status = LS_STOPPED; } else { lo->ll_status = LS_FAILED; @@ -1718,6 +1711,16 @@ out: * but does not know the position (the file name) in the * layout. * + * type "D": The MDT-object is a directory, it may knows its parent + * but because there is no valid linkEA, the LFSCK cannot + * know where to put it back to the namespace. + * type "O": The MDT-object has no linkEA, and there is no name + * entry that references the MDT-object. + * + * type "P": The orphan object to be created was a parent directory + * of some MDT-object which linkEA shows that the @orphan + * object is missing. + * * The orphan name will be like: * ${FID}-${infix}-${type}-${conflict_version} * @@ -3845,6 +3848,9 @@ log: /* layout APIs */ +static void lfsck_layout_slave_quit(const struct lu_env *env, + struct lfsck_component *com); + static int lfsck_layout_reset(const struct lu_env *env, struct lfsck_component *com, bool init) { @@ -4082,15 +4088,15 @@ static int lfsck_layout_master_prep(const struct lu_env *env, ENTRY; rc = lfsck_layout_load_bitmap(env, com); - if (rc > 0) { + if (rc != 0) { rc = lfsck_layout_reset(env, com, false); if (rc == 0) rc = lfsck_set_param(env, com->lc_lfsck, lsp->lsp_start, true); - } - if (rc != 0) - GOTO(log, rc); + if (rc != 0) + GOTO(log, rc); + } rc = lfsck_layout_prep(env, com, lsp->lsp_start); if (rc != 0) @@ -4102,7 +4108,7 @@ static int lfsck_layout_master_prep(const struct lu_env *env, log: CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos [" - LPU64"\n", lfsck_lfsck2name(com->lc_lfsck), + LPU64"]\n", lfsck_lfsck2name(com->lc_lfsck), com->lc_pos_start.lp_oit_cookie); return 0; @@ -4592,12 +4598,12 @@ static int lfsck_layout_master_post(const struct lu_env *env, lo->ll_flags &= ~LF_UPGRADE; list_move_tail(&com->lc_link, &lfsck->li_list_double_scan); } else if (result == 0) { - lo->ll_status = lfsck->li_status; - if (lo->ll_status == 0) + if (lfsck->li_status != 0) + lo->ll_status = lfsck->li_status; + else lo->ll_status = LS_STOPPED; - if (lo->ll_status != LS_PAUSED) { + if (lo->ll_status != LS_PAUSED) list_move_tail(&com->lc_link, &lfsck->li_list_idle); - } } else { lo->ll_status = LS_FAILED; list_move_tail(&com->lc_link, &lfsck->li_list_idle); @@ -4643,10 +4649,7 @@ static int lfsck_layout_slave_post(const struct lu_env *env, lfsck->li_pos_checkpoint.lp_oit_cookie; if (result > 0) { - if (lo->ll_flags & LF_INCOMPLETE) - lo->ll_status = LS_PARTIAL; - else - lo->ll_status = LS_SCANNING_PHASE2; + lo->ll_status = LS_SCANNING_PHASE2; lo->ll_flags |= LF_SCANNED_ONCE; if (lo->ll_flags & LF_CRASHED_LASTID) { done = true; @@ -4659,8 +4662,9 @@ static int lfsck_layout_slave_post(const struct lu_env *env, lo->ll_flags &= ~LF_UPGRADE; list_move_tail(&com->lc_link, &lfsck->li_list_double_scan); } else if (result == 0) { - lo->ll_status = lfsck->li_status; - if (lo->ll_status == 0) + if (lfsck->li_status != 0) + lo->ll_status = lfsck->li_status; + else lo->ll_status = LS_STOPPED; if (lo->ll_status != LS_PAUSED) list_move_tail(&com->lc_link, &lfsck->li_list_idle); @@ -4687,9 +4691,6 @@ static int lfsck_layout_slave_post(const struct lu_env *env, lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result); - if (result <= 0) - lfsck_rbtree_cleanup(env, com); - CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n", lfsck_lfsck2name(lfsck), rc); @@ -4878,9 +4879,39 @@ out: static int lfsck_layout_master_double_scan(const struct lu_env *env, struct lfsck_component *com) { - struct lfsck_layout *lo = com->lc_file_ram; + struct lfsck_layout *lo = com->lc_file_ram; + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_tgt_descs *ltds; + struct lfsck_tgt_desc *ltd; + struct lfsck_tgt_desc *next; + int rc; + + rc = lfsck_double_scan_generic(env, com, lo->ll_status); + + if (thread_is_stopped(&lad->lad_thread)) { + LASSERT(list_empty(&lad->lad_req_list)); + LASSERT(list_empty(&lad->lad_ost_phase1_list)); + LASSERT(list_empty(&lad->lad_mdt_phase1_list)); + + ltds = &lfsck->li_ost_descs; + spin_lock(<ds->ltd_lock); + list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list, + ltd_layout_phase_list) { + list_del_init(<d->ltd_layout_phase_list); + } + spin_unlock(<ds->ltd_lock); + + ltds = &lfsck->li_mdt_descs; + spin_lock(<ds->ltd_lock); + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list, + ltd_layout_phase_list) { + list_del_init(<d->ltd_layout_phase_list); + } + spin_unlock(<ds->ltd_lock); + } - return lfsck_double_scan_generic(env, com, lo->ll_status); + return rc; } static int lfsck_layout_slave_double_scan(const struct lu_env *env, @@ -4893,15 +4924,12 @@ static int lfsck_layout_slave_double_scan(const struct lu_env *env, int rc; ENTRY; - if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) { - lfsck_rbtree_cleanup(env, com); - lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0); - RETURN(0); - } - CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n", lfsck_lfsck2name(lfsck)); + if (lo->ll_flags & LF_INCOMPLETE) + GOTO(done, rc = 1); + atomic_inc(&lfsck->li_double_scan_count); com->lc_new_checked = 0; @@ -4942,9 +4970,9 @@ static int lfsck_layout_slave_double_scan(const struct lu_env *env, done: rc = lfsck_layout_double_scan_result(env, com, rc); - - lfsck_rbtree_cleanup(env, com); - lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc); + lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, + (rc > 0 && lo->ll_flags & LF_INCOMPLETE) ? 0 : rc); + lfsck_layout_slave_quit(env, com); if (atomic_dec_and_test(&lfsck->li_double_scan_count)) wake_up_all(&lfsck->li_thread.t_ctl_waitq); @@ -5003,7 +5031,8 @@ static void lfsck_layout_master_data_release(const struct lu_env *env, } spin_unlock(<ds->ltd_lock); - CFS_FREE_BITMAP(lad->lad_bitmap); + if (likely(lad->lad_bitmap != NULL)) + CFS_FREE_BITMAP(lad->lad_bitmap); OBD_FREE_PTR(lad); } @@ -5011,41 +5040,89 @@ static void lfsck_layout_master_data_release(const struct lu_env *env, static void lfsck_layout_slave_data_release(const struct lu_env *env, struct lfsck_component *com) { + struct lfsck_layout_slave_data *llsd = com->lc_data; + + lfsck_layout_slave_quit(env, com); + com->lc_data = NULL; + OBD_FREE_PTR(llsd); +} + +static void lfsck_layout_master_quit(const struct lu_env *env, + struct lfsck_component *com) +{ + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct lfsck_tgt_descs *ltds; + struct lfsck_tgt_desc *ltd; + struct lfsck_tgt_desc *next; + + LASSERT(lad != NULL); + + lfsck_quit_generic(env, com); + + LASSERT(thread_is_init(&lad->lad_thread) || + thread_is_stopped(&lad->lad_thread)); + LASSERT(list_empty(&lad->lad_req_list)); + + ltds = &lfsck->li_ost_descs; + spin_lock(<ds->ltd_lock); + list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list, + ltd_layout_phase_list) { + list_del_init(<d->ltd_layout_phase_list); + } + list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list, + ltd_layout_phase_list) { + list_del_init(<d->ltd_layout_phase_list); + } + spin_unlock(<ds->ltd_lock); + + ltds = &lfsck->li_mdt_descs; + spin_lock(<ds->ltd_lock); + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list, + ltd_layout_phase_list) { + list_del_init(<d->ltd_layout_phase_list); + } + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list, + ltd_layout_phase_list) { + list_del_init(<d->ltd_layout_phase_list); + } + spin_unlock(<ds->ltd_lock); +} + +static void lfsck_layout_slave_quit(const struct lu_env *env, + struct lfsck_component *com) +{ struct lfsck_layout_slave_data *llsd = com->lc_data; struct lfsck_layout_seq *lls; struct lfsck_layout_seq *next; struct lfsck_layout_slave_target *llst; - struct lfsck_layout_slave_target *tmp; LASSERT(llsd != NULL); list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list, - lls_list) { + lls_list) { list_del_init(&lls->lls_list); lfsck_object_put(env, lls->lls_lastid_obj); OBD_FREE_PTR(lls); } - list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list, - llst_list) { + spin_lock(&llsd->llsd_lock); + while (!list_empty(&llsd->llsd_master_list)) { + llst = list_entry(llsd->llsd_master_list.next, + struct lfsck_layout_slave_target, llst_list); list_del_init(&llst->llst_list); - OBD_FREE_PTR(llst); + spin_unlock(&llsd->llsd_lock); + lfsck_layout_llst_put(llst); } + spin_unlock(&llsd->llsd_lock); lfsck_rbtree_cleanup(env, com); - com->lc_data = NULL; - OBD_FREE_PTR(llsd); -} - -static void lfsck_layout_slave_quit(const struct lu_env *env, - struct lfsck_component *com) -{ - lfsck_rbtree_cleanup(env, com); } static int lfsck_layout_master_in_notify(const struct lu_env *env, struct lfsck_component *com, - struct lfsck_request *lr) + struct lfsck_request *lr, + struct thandle *th) { struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_layout *lo = com->lc_file_ram; @@ -5065,9 +5142,10 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env, } CDEBUG(D_LFSCK, "%s: layout LFSCK master handles notify %u " - "from %s %x, status %d\n", lfsck_lfsck2name(lfsck), - lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT", - lr->lr_index, lr->lr_status); + "from %s %x, status %d, flags %x, flags2 %x\n", + lfsck_lfsck2name(lfsck), lr->lr_event, + (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT", + lr->lr_index, lr->lr_status, lr->lr_flags, lr->lr_flags2); if (lr->lr_event != LE_PHASE1_DONE && lr->lr_event != LE_PHASE2_DONE && @@ -5150,7 +5228,8 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env, static int lfsck_layout_slave_in_notify(const struct lu_env *env, struct lfsck_component *com, - struct lfsck_request *lr) + struct lfsck_request *lr, + struct thandle *th) { struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_layout_slave_data *llsd = com->lc_data; @@ -5309,7 +5388,7 @@ static struct lfsck_operations lfsck_layout_master_ops = { .lfsck_dump = lfsck_layout_dump, .lfsck_double_scan = lfsck_layout_master_double_scan, .lfsck_data_release = lfsck_layout_master_data_release, - .lfsck_quit = lfsck_quit_generic, + .lfsck_quit = lfsck_layout_master_quit, .lfsck_in_notify = lfsck_layout_master_in_notify, .lfsck_query = lfsck_layout_query, }; diff --git a/lustre/lfsck/lfsck_lib.c b/lustre/lfsck/lfsck_lib.c index c9ad104..127a87a 100644 --- a/lustre/lfsck/lfsck_lib.c +++ b/lustre/lfsck/lfsck_lib.c @@ -2875,7 +2875,7 @@ out: EXPORT_SYMBOL(lfsck_stop); int lfsck_in_notify(const struct lu_env *env, struct dt_device *key, - struct lfsck_request *lr) + struct lfsck_request *lr, struct thandle *th) { int rc = -EOPNOTSUPP; ENTRY; @@ -2914,6 +2914,8 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key, case LE_PEER_EXIT: case LE_CONDITIONAL_DESTROY: case LE_CREATE_ORPHAN: + case LE_SKIP_NLINK_DECLARE: + case LE_SKIP_NLINK: case LE_PAIRS_VERIFY: { struct lfsck_instance *lfsck; struct lfsck_component *com; @@ -2924,7 +2926,7 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key, com = lfsck_component_find(lfsck, lr->lr_active); if (likely(com != NULL)) { - rc = com->lc_ops->lfsck_in_notify(env, com, lr); + rc = com->lc_ops->lfsck_in_notify(env, com, lr, th); lfsck_component_put(env, com); } diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index f91bdcd..3f9d3d7 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -207,9 +207,8 @@ static void lfsck_namespace_record_failure(const struct lu_env *env, * \param[in] env pointer to the thread context * \param[in] com pointer to the lfsck component * - * \retval positive number for data corruption * \retval 0 for success - * \retval negative error number on failure + * \retval negative error number on failure or data corruption */ static int lfsck_namespace_load_bitmap(const struct lu_env *env, struct lfsck_component *com) @@ -259,14 +258,8 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env, rc = dt_xattr_get(env, obj, lfsck_buf_get(env, bitmap->data, size), XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA); - if (rc == -ERANGE || rc == -ENODATA || rc == 0) - RETURN(1); - - if (rc < 0) - RETURN(rc); - if (rc != size) - RETURN(rc); + RETURN(rc >= 0 ? -EINVAL : rc); if (cfs_bitmap_check_empty(bitmap)) lad->lad_incomplete = 0; @@ -1061,7 +1054,7 @@ log: * \param[in] type the orphan's type to be created * * type "P": The orphan object to be created was a parent directory - * of some DMT-object which linkEA shows that the @orphan + * of some MDT-object which linkEA shows that the @orphan * object is missing. * * \see lfsck_layout_recreate_parent() for more types. @@ -1160,7 +1153,7 @@ out: * \param[in] type the orphan's type to be created * * type "P": The orphan object to be created was a parent directory - * of some DMT-object which linkEA shows that the @orphan + * of some MDT-object which linkEA shows that the @orphan * object is missing. * * \see lfsck_layout_recreate_parent() for more types. @@ -1367,7 +1360,7 @@ log: * \param[in] orphan pointer to the orphan MDT-object * * type "P": The orphan object to be created was a parent directory - * of some DMT-object which linkEA shows that the @orphan + * of some MDT-object which linkEA shows that the @orphan * object is missing. * * \see lfsck_layout_recreate_parent() for more types. @@ -2674,6 +2667,130 @@ next: } /** + * Repair the object's nlink attribute. + * + * If all the known name entries have been verified, then the object's hard + * link attribute should match the object's linkEA entries count unless the + * object's has too much hard link to be recorded in the linkEA. Such cases + * should have been marked in the LFSCK tracing file. Otherwise, trust the + * linkEA to update the object's nlink attribute. + * + * \param[in] env pointer to the thread context + * \param[in] com pointer to the lfsck component + * \param[in] obj pointer to the dt_object to be handled + * \param[in,out] nlink pointer to buffer to object's hard lock count before + * and after the repairing + * + * \retval positive number for repaired cases + * \retval 0 if nothing to be repaired + * \retval negative error number on failure + */ +static int lfsck_namespace_repair_nlink(const struct lu_env *env, + struct lfsck_component *com, + struct dt_object *obj, __u32 *nlink) +{ + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_attr *la = &info->lti_la3; + struct lu_fid *tfid = &info->lti_fid3; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_device *dev = lfsck->li_bottom; + const struct lu_fid *cfid = lfsck_dto2fid(obj); + struct dt_object *child = NULL; + struct thandle *th = NULL; + struct linkea_data ldata = { 0 }; + struct lustre_handle lh = { 0 }; + __u32 old = *nlink; + int rc = 0; + __u8 flags; + ENTRY; + + LASSERT(!dt_object_remote(obj)); + LASSERT(S_ISREG(lfsck_object_type(obj))); + + child = lfsck_object_find_by_dev(env, dev, cfid); + if (IS_ERR(child)) + GOTO(log, rc = PTR_ERR(child)); + + rc = lfsck_ibits_lock(env, lfsck, child, &lh, + MDS_INODELOCK_UPDATE | + MDS_INODELOCK_XATTR, LCK_EX); + if (rc != 0) + GOTO(log, rc); + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(log, rc = PTR_ERR(th)); + + la->la_valid = LA_NLINK; + rc = dt_declare_attr_set(env, child, la, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, child, 0); + /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has + * ever tried to verify some remote MDT-object that resides on this + * MDT, but this MDT failed to respond such request. So means there + * may be some remote name entry on other MDT that references this + * object with another name, so we cannot know whether this linkEA + * is valid or not. So keep it there and maybe resolved when next + * LFSCK run. */ + if (ns->ln_flags & LF_INCOMPLETE) + GOTO(unlock, rc = 0); + + fid_cpu_to_be(tfid, cfid); + rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags, + (const struct dt_key *)tfid, BYPASS_CAPA); + if (rc != 0) + GOTO(unlock, rc); + + if (flags & LNTF_SKIP_NLINK) + GOTO(unlock, rc = 0); + + rc = lfsck_links_read2(env, child, &ldata); + if (rc == -ENODATA) + GOTO(unlock, rc = 0); + + if (rc != 0) + GOTO(unlock, rc); + + if (*nlink == ldata.ld_leh->leh_reccount) + GOTO(unlock, rc = 0); + + la->la_nlink = *nlink = ldata.ld_leh->leh_reccount; + if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) + GOTO(unlock, rc = 1); + + rc = dt_attr_set(env, child, la, th, BYPASS_CAPA); + + GOTO(unlock, rc = (rc == 0 ? 1 : rc)); + +unlock: + dt_write_unlock(env, child); + +stop: + dt_trans_stop(env, dev, th); + +log: + lfsck_ibits_unlock(&lh, LCK_EX); + if (child != NULL && !IS_ERR(child)) + lfsck_object_put(env, child); + + CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s " + "nlink count from %u to %u: rc = %d\n", + lfsck_lfsck2name(lfsck), PFID(cfid), old, *nlink, rc); + + if (rc != 0) + ns->ln_flags |= LF_INCONSISTENT; + + return rc; +} + +/** * Double scan the directory object for namespace LFSCK. * * This function will verify the pairs in the namespace tree: @@ -3227,8 +3344,12 @@ out: return rc; if (la->la_nlink != count) { - /* XXX: there will be other patch(es) for MDT-object - * hard links verification. */ + rc = lfsck_namespace_repair_nlink(env, com, child, + &la->la_nlink); + if (rc > 0) { + ns->ln_objs_nlink_repaired++; + rc = 0; + } } if (repaired) { @@ -3425,17 +3546,17 @@ static int lfsck_namespace_prep(const struct lu_env *env, int rc; rc = lfsck_namespace_load_bitmap(env, com); - if (rc > 0 || (rc == 0 && ns->ln_status == LS_COMPLETED)) { + if (rc != 0 || ns->ln_status == LS_COMPLETED) { rc = lfsck_namespace_reset(env, com, false); if (rc == 0) rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true); - } - if (rc != 0) { - CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: rc = %d\n", - lfsck_lfsck2name(lfsck), rc); + if (rc != 0) { + CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: " + "rc = %d\n", lfsck_lfsck2name(lfsck), rc); - return rc; + return rc; + } } down_write(&com->lc_sem); @@ -3681,8 +3802,9 @@ static int lfsck_namespace_post(const struct lu_env *env, list_del_init(&com->lc_link_dir); list_move_tail(&com->lc_link, &lfsck->li_list_double_scan); } else if (result == 0) { - ns->ln_status = lfsck->li_status; - if (ns->ln_status == 0) + if (lfsck->li_status != 0) + ns->ln_status = lfsck->li_status; + else ns->ln_status = LS_STOPPED; if (ns->ln_status != LS_PAUSED) { list_del_init(&com->lc_link_dir); @@ -3881,9 +4003,27 @@ out: static int lfsck_namespace_double_scan(const struct lu_env *env, struct lfsck_component *com) { - struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_namespace *ns = com->lc_file_ram; + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs; + struct lfsck_tgt_desc *ltd; + struct lfsck_tgt_desc *next; + int rc; + + rc = lfsck_double_scan_generic(env, com, ns->ln_status); + if (thread_is_stopped(&lad->lad_thread)) { + LASSERT(list_empty(&lad->lad_req_list)); + LASSERT(list_empty(&lad->lad_mdt_phase1_list)); - return lfsck_double_scan_generic(env, com, ns->ln_status); + spin_lock(<ds->ltd_lock); + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list, + ltd_namespace_phase_list) { + list_del_init(<d->ltd_namespace_phase_list); + } + spin_unlock(<ds->ltd_lock); + } + + return rc; } static void lfsck_namespace_data_release(const struct lu_env *env, @@ -3916,14 +4056,44 @@ static void lfsck_namespace_data_release(const struct lu_env *env, } spin_unlock(<ds->ltd_lock); - CFS_FREE_BITMAP(lad->lad_bitmap); + if (likely(lad->lad_bitmap != NULL)) + CFS_FREE_BITMAP(lad->lad_bitmap); OBD_FREE_PTR(lad); } +static void lfsck_namespace_quit(const struct lu_env *env, + struct lfsck_component *com) +{ + struct lfsck_assistant_data *lad = com->lc_data; + struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs; + struct lfsck_tgt_desc *ltd; + struct lfsck_tgt_desc *next; + + LASSERT(lad != NULL); + + lfsck_quit_generic(env, com); + + LASSERT(thread_is_init(&lad->lad_thread) || + thread_is_stopped(&lad->lad_thread)); + LASSERT(list_empty(&lad->lad_req_list)); + + spin_lock(<ds->ltd_lock); + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list, + ltd_namespace_phase_list) { + list_del_init(<d->ltd_namespace_phase_list); + } + list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list, + ltd_namespace_phase_list) { + list_del_init(<d->ltd_namespace_phase_list); + } + spin_unlock(<ds->ltd_lock); +} + static int lfsck_namespace_in_notify(const struct lu_env *env, struct lfsck_component *com, - struct lfsck_request *lr) + struct lfsck_request *lr, + struct thandle *th) { struct lfsck_instance *lfsck = com->lc_lfsck; struct lfsck_namespace *ns = com->lc_file_ram; @@ -3966,6 +4136,70 @@ out_create: return rc; } + case LE_SKIP_NLINK_DECLARE: { + struct dt_object *obj = com->lc_obj; + struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; + __u8 flags = 0; + + LASSERT(th != NULL); + + rc = dt_declare_delete(env, obj, + (const struct dt_key *)key, th); + if (rc == 0) + rc = dt_declare_insert(env, obj, + (const struct dt_rec *)&flags, + (const struct dt_key *)key, th); + + RETURN(rc); + } + case LE_SKIP_NLINK: { + struct dt_object *obj = com->lc_obj; + struct lu_fid *key = &lfsck_env_info(env)->lti_fid3; + __u8 flags = 0; + bool exist = false; + ENTRY; + + LASSERT(th != NULL); + + fid_cpu_to_be(key, &lr->lr_fid); + rc = dt_lookup(env, obj, (struct dt_rec *)&flags, + (const struct dt_key *)key, BYPASS_CAPA); + if (rc == 0) { + if (flags & LNTF_SKIP_NLINK) + RETURN(0); + + exist = true; + } else if (rc != -ENOENT) { + GOTO(log, rc); + } + + flags |= LNTF_SKIP_NLINK; + if (exist) { + rc = dt_delete(env, obj, (const struct dt_key *)key, + th, BYPASS_CAPA); + if (rc != 0) + GOTO(log, rc); + } + + rc = dt_insert(env, obj, (const struct dt_rec *)&flags, + (const struct dt_key *)key, th, BYPASS_CAPA, 1); + + GOTO(log, rc); + +log: + CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID + " to be skipped for namespace double scan: rc = %d\n", + lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc); + + if (rc != 0) + /* If we cannot record this object in the LFSCK tracing, + * we have to mark the LFSC as LF_INCOMPLETE, then the + * LFSCK will skip nlink attribute verification for + * all objects. */ + ns->ln_flags |= LF_INCOMPLETE; + + return 0; + } case LE_PHASE1_DONE: case LE_PHASE2_DONE: case LE_PEER_EXIT: @@ -4063,7 +4297,7 @@ static struct lfsck_operations lfsck_namespace_ops = { .lfsck_dump = lfsck_namespace_dump, .lfsck_double_scan = lfsck_namespace_double_scan, .lfsck_data_release = lfsck_namespace_data_release, - .lfsck_quit = lfsck_quit_generic, + .lfsck_quit = lfsck_namespace_quit, .lfsck_in_notify = lfsck_namespace_in_notify, .lfsck_query = lfsck_namespace_query, }; @@ -4514,6 +4748,33 @@ nodata: GOTO(stop, rc); rc = lfsck_links_write(env, obj, &ldata, handle); + if (unlikely(rc == -ENOSPC) && + S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) { + if (handle != NULL) { + LASSERT(dt_write_locked(env, obj)); + + dt_write_unlock(env, obj); + dtlocked = false; + + dt_trans_stop(env, dev, handle); + handle = NULL; + + lfsck_ibits_unlock(&lh, LCK_EX); + } + + rc = lfsck_namespace_trace_update(env, com, + &lnr->lnr_fid, LNTF_SKIP_NLINK, true); + if (rc != 0) + /* If we cannot record this object in the + * LFSCK tracing, we have to mark the LFSCK + * as LF_INCOMPLETE, then the LFSCK will + * skip nlink attribute verification for + * all objects. */ + ns->ln_flags |= LF_INCOMPLETE; + + GOTO(out, rc = 0); + } + if (rc != 0) GOTO(stop, rc); @@ -4832,8 +5093,9 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env, ns->ln_time_last_complete = ns->ln_time_last_checkpoint; ns->ln_success_count++; } else if (rc == 0) { - ns->ln_status = lfsck->li_status; - if (ns->ln_status == 0) + if (lfsck->li_status != 0) + ns->ln_status = lfsck->li_status; + else ns->ln_status = LS_STOPPED; } else { ns->ln_status = LS_FAILED; @@ -4937,7 +5199,7 @@ out: if (rc != 0) CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail " "to sync failure with MDTs, and related MDTs " - "may handle orphan un-properly: rc = %d\n", + "may handle orphan improperly: rc = %d\n", lfsck_lfsck2name(lfsck), rc); EXIT; diff --git a/lustre/mdd/mdd_compat.c b/lustre/mdd/mdd_compat.c index 7b50624..023ffa5 100644 --- a/lustre/mdd/mdd_compat.c +++ b/lustre/mdd/mdd_compat.c @@ -110,7 +110,7 @@ static int mdd_convert_linkea(const struct lu_env *env, if (IS_ERR(th)) RETURN(PTR_ERR(th)); - rc = mdd_declare_links_add(env, o, th, NULL); + rc = mdd_declare_links_add(env, o, th, NULL, MLAO_IGNORE); if (rc) GOTO(out, rc); rc = dt_trans_start_local(env, mdd->mdd_child, th); diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index bccc16d..8899fc5 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -1113,16 +1113,35 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj, { const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf, ldata->ld_leh->leh_len); + int rc; if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA)) return 0; - return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle, - mdd_object_capa(env, mdd_obj)); + rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle, + mdd_object_capa(env, mdd_obj)); + if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) && + mdd_object_remote(mdd_obj) == 0) { + struct lfsck_request *lr = &mdd_env_info(env)->mti_lr; + + /* XXX: If the linkEA is overflow, then we need to notify the + * namespace LFSCK to skip "nlink" attribute verification + * on this object to avoid the "nlink" to be shrinked by + * wrong. It may be not good an interaction with LFSCK + * like this. We will consider to replace it with other + * mechanism in future. LU-5802. */ + lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK, + LFSCK_TYPE_NAMESPACE); + lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom, + lr, handle); + } + + return rc; } int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, - struct thandle *handle, struct linkea_data *ldata) + struct thandle *handle, struct linkea_data *ldata, + enum mdd_links_add_overflow overflow) { int rc; int ea_len; @@ -1140,6 +1159,25 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, rc = mdo_declare_xattr_set(env, mdd_obj, mdd_buf_get_const(env, linkea, ea_len), XATTR_NAME_LINK, 0, handle); + if (rc != 0) + return rc; + + if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) { + struct lfsck_request *lr = &mdd_env_info(env)->mti_lr; + + /* XXX: If the linkEA is overflow, then we need to notify the + * namespace LFSCK to skip "nlink" attribute verification + * on this object to avoid the "nlink" to be shrinked by + * wrong. It may be not good an interaction with LFSCK + * like this. We will consider to replace it with other + * mechanism in future. LU-5802. */ + lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE, + LFSCK_TYPE_NAMESPACE); + rc = lfsck_in_notify(env, + mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom, + lr, handle); + } + return rc; } @@ -1152,7 +1190,7 @@ static inline int mdd_declare_links_del(const struct lu_env *env, /* For directory, the linkEA will be removed together * with the object. */ if (!S_ISDIR(mdd_object_type(c))) - rc = mdd_declare_links_add(env, c, handle, NULL); + rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE); return rc; } @@ -1174,9 +1212,15 @@ static int mdd_declare_link(const struct lu_env *env, return rc; rc = mdo_declare_ref_add(env, c, handle); - if (rc) + if (rc != 0) return rc; + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) { + rc = mdo_declare_ref_add(env, c, handle); + if (rc != 0) + return rc; + } + la->la_valid = LA_CTIME | LA_MTIME; rc = mdo_declare_attr_set(env, p, la, handle); if (rc != 0) @@ -1184,11 +1228,12 @@ static int mdd_declare_link(const struct lu_env *env, la->la_valid = LA_CTIME; rc = mdo_declare_attr_set(env, c, la, handle); - if (rc) + if (rc != 0) return rc; - rc = mdd_declare_links_add(env, c, handle, data); - if (rc) + rc = mdd_declare_links_add(env, c, handle, data, + S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE); + if (rc != 0) return rc; rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle); @@ -1244,10 +1289,17 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, if (rc) GOTO(out_unlock, rc); - rc = mdo_ref_add(env, mdd_sobj, handle); - if (rc) - GOTO(out_unlock, rc); + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) { + rc = mdo_ref_add(env, mdd_sobj, handle); + if (rc != 0) + GOTO(out_unlock, rc); + } + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) { + rc = mdo_ref_add(env, mdd_sobj, handle); + if (rc != 0) + GOTO(out_unlock, rc); + } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) { struct lu_fid tfid = *mdo2fid(mdd_sobj); @@ -2027,7 +2079,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, if (rc != 0) return rc; - rc = mdd_declare_links_add(env, c, handle, ldata); + rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE); if (rc) return rc; @@ -2564,7 +2616,8 @@ static int mdd_declare_rename(const struct lu_env *env, if (rc) return rc; - rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata); + rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata, + S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE); if (rc) return rc; @@ -2981,7 +3034,8 @@ static int mdd_linkea_update_child_internal(const struct lu_env *env, linkea_entry_pack(ldata.ld_lee, &lname, mdd_object_fid(parent)); if (declare) - rc = mdd_declare_links_add(env, child, handle, &ldata); + rc = mdd_declare_links_add(env, child, handle, &ldata, + MLAO_IGNORE); else rc = mdd_links_write(env, child, &ldata, handle); break; @@ -3031,7 +3085,8 @@ static int mdd_update_linkea_internal(const struct lu_env *env, } if (declare) - rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata); + rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata, + MLAO_IGNORE); else rc = mdd_links_write(env, mdd_tobj, ldata, handle); diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 316651f..eff2098 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -161,6 +161,12 @@ struct mdd_thread_info { struct linkea_data mti_link_data; struct md_op_spec mti_spec; struct dt_insert_rec mti_dt_rec; + struct lfsck_request mti_lr; +}; + +enum mdd_links_add_overflow { + MLAO_IGNORE = false, + MLAO_CHECK = true, }; extern const char orph_index_name[]; @@ -222,7 +228,8 @@ int mdd_lookup(const struct lu_env *env, int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj, struct linkea_data *ldata); int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, - struct thandle *handle, struct linkea_data *ldata); + struct thandle *handle, struct linkea_data *ldata, + enum mdd_links_add_overflow overflow); int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj, struct linkea_data *ldata, struct thandle *handle); struct lu_buf *mdd_links_get(const struct lu_env *env, diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c index eede5f2..714645b 100644 --- a/lustre/ofd/ofd_io.c +++ b/lustre/ofd/ofd_io.c @@ -77,7 +77,7 @@ static void ofd_inconsistency_verify_one(const struct lu_env *env, lr->lr_fid2 = oii->oii_pfid; /* client given PFID. */ lr->lr_fid3 = *pfid; /* OST local stored PFID. */ - rc = lfsck_in_notify(env, ofd->ofd_osd, lr); + rc = lfsck_in_notify(env, ofd->ofd_osd, lr, NULL); ofd_write_lock(env, fo); switch (lr->lr_status) { case LPVS_INIT: diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index 245b5d8..1b3b4f0 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -350,10 +350,12 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, /* Only the new created objects need to be recorded. */ if (ofd->ofd_osd->dd_record_fid_accessed) { - lfsck_pack_rfa(&ofd_info(env)->fti_lr, - lu_object_fid(&fo->ofo_obj.do_lu)); - lfsck_in_notify(env, ofd->ofd_osd, - &ofd_info(env)->fti_lr); + struct lfsck_request *lr = &ofd_info(env)->fti_lr; + + lfsck_pack_rfa(lr, lu_object_fid(&fo->ofo_obj.do_lu), + LE_FID_ACCESSED, + LFSCK_TYPE_LAYOUT); + lfsck_in_notify(env, ofd->ofd_osd, lr, NULL); } if (likely(!ofd_object_exists(fo) && diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index cc24aa2..23ce9fb 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -3055,6 +3055,10 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) && + strcmp(name, XATTR_NAME_LINK) == 0) + return -ENOSPC; + return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len, fs_flags); } diff --git a/lustre/osd-zfs/osd_xattr.c b/lustre/osd-zfs/osd_xattr.c index a24aa34..0d783aa 100644 --- a/lustre/osd-zfs/osd_xattr.c +++ b/lustre/osd-zfs/osd_xattr.c @@ -598,6 +598,10 @@ int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, strcmp(name, POSIX_ACL_XATTR_DEFAULT) == 0)) RETURN(-EOPNOTSUPP); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) && + strcmp(name, XATTR_NAME_LINK) == 0) + RETURN(-ENOSPC); + oh = container_of0(handle, struct osd_thandle, ot_super); down(&obj->oo_guard); diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 7c0c497..9441a5f 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -563,11 +563,8 @@ int osp_md_declare_object_create(const struct lu_env *env, int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th); -int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th); -int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th, - struct lustre_capa *capa); +int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th); extern const struct dt_index_operations osp_md_index_ops; /* osp_precreate.c */ diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index c10a9dc..a66dd65 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -326,10 +326,7 @@ static void osp_md_ah_init(const struct lu_env *env, } /** - * Implementation of dt_object_operations::do_declare_attr_get - * - * Declare setting attributes of the remote object, i.e. insert remote - * object attr_set update into RPC. + * Add attr_set sub-request into the OUT RPC. * * \param[in] env execution environment * \param[in] dt object on which to set attributes @@ -339,8 +336,8 @@ static void osp_md_ah_init(const struct lu_env *env, * \retval 0 if the insertion succeeds. * \retval negative errno if the insertion fails. */ -int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th) +int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { struct dt_update_request *update; int rc; @@ -361,11 +358,46 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, } /** + * Implementation of dt_object_operations::do_declare_attr_get + * + * Declare setting attributes to the specified remote object. + * + * If the transaction is a remote transaction, then add the modification + * sub-request into the OUT RPC here, and such OUT RPC will be triggered + * when transaction start. + * + * \param[in] env execution environment + * \param[in] dt object on which to set attributes + * \param[in] attr attributes to be set + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) +{ + int rc = 0; + + CDEBUG(D_INFO, "declare attr set object "DFID"\n", + PFID(&dt->do_lu.lo_header->loh_fid)); + + if (!is_only_remote_trans(th)) + rc = __osp_md_attr_set(env, dt, attr, th); + + return rc; +} + +/** * Implementation of dt_object_operations::do_attr_set * - * Do nothing in this method for now. In DNE phase I, remote updates - * are actually executed during transaction start, i.e. object attributes - * have already been set when calling this method. + * Set attributes to the specified remote object. + * + * If the transaction is a remote transaction, then related modification + * sub-request has been added in the declare phase and related OUT RPC + * has been triggered at transaction start. Otherwise, the modification + * sub-request will be added here, and related OUT RPC will be triggered + * when transaction stop. * * \param[in] env execution environment * \param[in] dt object to set attributes @@ -379,10 +411,15 @@ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th, struct lustre_capa *capa) { + int rc = 0; + CDEBUG(D_INFO, "attr set object "DFID"\n", PFID(&dt->do_lu.lo_header->loh_fid)); - RETURN(0); + if (is_only_remote_trans(th)) + rc = __osp_md_attr_set(env, dt, attr, th); + + RETURN(rc); } /** diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 2835bbb..ed26ee1 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -623,22 +623,23 @@ static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } - if (o->opo_new) - /* no need in logging for new objects being created */ - RETURN(0); - if (!(attr->la_valid & (LA_UID | LA_GID))) RETURN(0); - if (!is_only_remote_trans(th)) + if (!is_only_remote_trans(th)) { + if (o->opo_new) + /* no need in logging for new objects being created */ + RETURN(0); + /* * track all UID/GID changes via llog */ rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th); - else + } else { /* It is for OST-object attr_set directly without updating * local MDT-object attribute. It is usually used by LFSCK. */ - rc = osp_md_declare_attr_set(env, dt, attr, th); + rc = __osp_md_attr_set(env, dt, attr, th); + } if (rc != 0 || o->opo_ooa == NULL) RETURN(rc); @@ -744,8 +745,10 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, if (is_only_remote_trans(th)) { rc = __osp_attr_set(env, dt, attr, th); - if (rc != 0) - RETURN(rc); + if (rc == 0 && o->opo_new) + o->opo_new = 0; + + RETURN(rc); } /* we're interested in uid/gid changes only */ @@ -761,17 +764,8 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(0); } - if (!is_only_remote_trans(th)) - /* - * once transaction is committed put proper command on - * the queue going to our OST - */ - rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr); - /* XXX: send new uid/gid to OST ASAP? */ - else - /* It is for OST-object attr_set directly without updating - * local MDT-object attribute. It is usually used by LFSCK. */ - rc = osp_md_attr_set(env, dt, attr, th, capa); + rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr); + /* XXX: send new uid/gid to OST ASAP? */ RETURN(rc); } diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index e769304..a0caec0 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -4715,6 +4715,10 @@ void lustre_assert_wire_constants(void) (long long)LE_PAIRS_VERIFY); LASSERTF(LE_CREATE_ORPHAN == 12, "found %lld\n", (long long)LE_CREATE_ORPHAN); + LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n", + (long long)LE_SKIP_NLINK_DECLARE); + LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n", + (long long)LE_SKIP_NLINK); LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n", (unsigned)LEF_TO_OST); LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n", diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index 25aac41..a0e4879 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -565,12 +565,30 @@ static int out_tx_xattr_set_exec(const struct lu_env *env, rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf, arg->u.xattr_set.name, arg->u.xattr_set.flags, th, NULL); - dt_write_unlock(env, dt_obj); /** * Ignore errors if this is LINK EA **/ - if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK))) + if (unlikely(rc != 0 && + strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) { + /* XXX: If the linkEA is overflow, then we need to notify the + * namespace LFSCK to skip "nlink" attribute verification + * on this object to avoid the "nlink" to be shrinked by + * wrong. It may be not good an interaction with LFSCK + * like this. We will consider to replace it with other + * mechanism in future. LU-5802. */ + if (rc == -ENOSPC) { + struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; + + lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), + LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE); + tgt_lfsck_in_notify(env, + tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th); + } + rc = 0; + } + dt_write_unlock(env, dt_obj); + out: CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n", dt_obd_name(th->th_dev), arg->reply, arg->index, rc); @@ -596,6 +614,24 @@ static int __out_tx_xattr_set(const struct lu_env *env, if (rc != 0) return rc; + if (strcmp(name, XATTR_NAME_LINK) == 0) { + struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; + + /* XXX: If the linkEA is overflow, then we need to notify the + * namespace LFSCK to skip "nlink" attribute verification + * on this object to avoid the "nlink" to be shrinked by + * wrong. It may be not good an interaction with LFSCK + * like this. We will consider to replace it with other + * mechanism in future. LU-5802. */ + lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), + LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE); + rc = tgt_lfsck_in_notify(env, + tgt_ses_info(env)->tsi_tgt->lut_bottom, + lr, ta->ta_handle); + if (rc != 0) + return rc; + } + arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line); if (IS_ERR(arg)) return PTR_ERR(arg); @@ -1566,8 +1602,10 @@ int out_handle(struct tgt_session_info *tsi) if (dt->dd_record_fid_accessed) { lfsck_pack_rfa(&tti->tti_lr, - lu_object_fid(&dt_obj->do_lu)); - tgt_lfsck_in_notify(env, dt, &tti->tti_lr); + lu_object_fid(&dt_obj->do_lu), + LE_FID_ACCESSED, + LFSCK_TYPE_LAYOUT); + tgt_lfsck_in_notify(env, dt, &tti->tti_lr, NULL); } tti->tti_u.update.tti_dt_object = dt_obj; diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index 3c42442..57ec2e5 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -1373,11 +1373,13 @@ EXPORT_SYMBOL(tgt_sec_ctx_handlers); int (*tgt_lfsck_in_notify)(const struct lu_env *env, struct dt_device *key, - struct lfsck_request *lr) = NULL; + struct lfsck_request *lr, + struct thandle *th) = NULL; void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *, struct dt_device *, - struct lfsck_request *)) + struct lfsck_request *, + struct thandle *)) { tgt_lfsck_in_notify = notify; } @@ -1408,7 +1410,7 @@ static int tgt_handle_lfsck_notify(struct tgt_session_info *tsi) if (lr == NULL) RETURN(-EPROTO); - rc = tgt_lfsck_in_notify(env, key, lr); + rc = tgt_lfsck_in_notify(env, key, lr, NULL); RETURN(rc); } diff --git a/lustre/target/tgt_internal.h b/lustre/target/tgt_internal.h index ebe3a1a..7c96074 100644 --- a/lustre/target/tgt_internal.h +++ b/lustre/target/tgt_internal.h @@ -45,7 +45,8 @@ extern int (*tgt_lfsck_in_notify)(const struct lu_env *env, struct dt_device *key, - struct lfsck_request *lr); + struct lfsck_request *lr, + struct thandle *th); struct tx_arg; typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th, diff --git a/lustre/tests/sanity-lfsck.sh b/lustre/tests/sanity-lfsck.sh index 38c7f3a..1d330ea 100644 --- a/lustre/tests/sanity-lfsck.sh +++ b/lustre/tests/sanity-lfsck.sh @@ -46,7 +46,7 @@ setupall ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18 19 20 21" [[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.6.50) ]] && - ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24 25 26 27 28" + ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24 25 26 27 28 29" build_test_filter @@ -1663,7 +1663,7 @@ test_18a() { check_mount_and_prep $LFS mkdir -i 0 $DIR/$tdir/a1 - $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1 + $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1 dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2 local saved_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }') @@ -1673,7 +1673,7 @@ test_18a() { if [ $MDSCOUNT -ge 2 ]; then $LFS mkdir -i 1 $DIR/$tdir/a2 - $LFS setstripe -c 2 -i 1 -s 1M $DIR/$tdir/a2 + $LFS setstripe -c 2 -i 1 -S 1M $DIR/$tdir/a2 dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2 $LFS path2fid $DIR/$tdir/a2/f2 $LFS getstripe $DIR/$tdir/a2/f2 @@ -1778,7 +1778,7 @@ test_18b() { check_mount_and_prep $LFS mkdir -i 0 $DIR/$tdir/a1 - $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1 + $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1 dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2 local saved_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }') local fid1=$($LFS path2fid $DIR/$tdir/a1/f1) @@ -1787,7 +1787,7 @@ test_18b() { if [ $MDSCOUNT -ge 2 ]; then $LFS mkdir -i 1 $DIR/$tdir/a2 - $LFS setstripe -c 2 -i 1 -s 1M $DIR/$tdir/a2 + $LFS setstripe -c 2 -i 1 -S 1M $DIR/$tdir/a2 dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2 fid2=$($LFS path2fid $DIR/$tdir/a2/f2) echo ${fid2} @@ -1891,7 +1891,7 @@ test_18c() { check_mount_and_prep $LFS mkdir -i 0 $DIR/$tdir/a1 - $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1 + $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1 echo "Inject failure, to simulate the case of missing parent FID" #define OBD_FAIL_LFSCK_NOPFID 0x1617 @@ -1902,7 +1902,7 @@ test_18c() { if [ $MDSCOUNT -ge 2 ]; then $LFS mkdir -i 1 $DIR/$tdir/a2 - $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a2 + $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a2 dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2 $LFS getstripe $DIR/$tdir/a2/f2 fi @@ -2001,7 +2001,7 @@ test_18d() { check_mount_and_prep mkdir $DIR/$tdir/a1 - $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1 + $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1 echo "guard" > $DIR/$tdir/a1/f1 echo "foo" > $DIR/$tdir/a1/f2 local saved_size=$(ls -il $DIR/$tdir/a1/f2 | awk '{ print $6 }') @@ -2095,7 +2095,7 @@ test_18e() { check_mount_and_prep mkdir $DIR/$tdir/a1 - $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1 + $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1 echo "guard" > $DIR/$tdir/a1/f1 echo "foo" > $DIR/$tdir/a1/f2 local saved_size=$(ls -il $DIR/$tdir/a1/f2 | awk '{ print $6 }') @@ -2211,22 +2211,22 @@ test_18f() { check_mount_and_prep $LFS mkdir -i 0 $DIR/$tdir/a1 - $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1 + $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1 dd if=/dev/zero of=$DIR/$tdir/a1/guard bs=1M count=2 dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2 $LFS mkdir -i 0 $DIR/$tdir/a2 - $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a2 + $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a2 dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2 $LFS getstripe $DIR/$tdir/a1/f1 $LFS getstripe $DIR/$tdir/a2/f2 if [ $MDSCOUNT -ge 2 ]; then $LFS mkdir -i 1 $DIR/$tdir/a3 - $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a3 + $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a3 dd if=/dev/zero of=$DIR/$tdir/a3/guard bs=1M count=2 dd if=/dev/zero of=$DIR/$tdir/a3/f3 bs=1M count=2 $LFS mkdir -i 1 $DIR/$tdir/a4 - $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a4 + $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a4 dd if=/dev/zero of=$DIR/$tdir/a4/f4 bs=1M count=2 $LFS getstripe $DIR/$tdir/a3/f3 $LFS getstripe $DIR/$tdir/a4/f4 @@ -2416,10 +2416,10 @@ test_20() { check_mount_and_prep $LFS mkdir -i 0 $DIR/$tdir/a1 if [ $OSTCOUNT -gt 2 ]; then - $LFS setstripe -c 3 -i 0 -s 1M $DIR/$tdir/a1 + $LFS setstripe -c 3 -i 0 -S 1M $DIR/$tdir/a1 bcount=513 else - $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a1 + $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a1 bcount=257 fi @@ -3386,7 +3386,7 @@ test_28() { echo "The target name entry is lost. The LFSCK should insert the" echo "orphan MDT-object under .lustre/lost+found/MDTxxxx. But if" echo "the MDT (on which the orphan MDT-object resides) has ever" - echo "failed to respond some name entry verification durin the" + echo "failed to respond some name entry verification during the" echo "first stage-scanning, then the LFSCK should skip to handle" echo "orphan MDT-object on this MDT. But other MDTs should not" echo "be affected." @@ -3481,6 +3481,161 @@ test_28() { } run_test 28 "Skip the failed MDT(s) when handle orphan MDT-objects" +test_29a() { + echo "#####" + echo "The object's nlink attribute is larger than the object's known" + echo "name entries count. The LFSCK will repair the object's nlink" + echo "attribute to match the known name entries count" + echo "#####" + + check_mount_and_prep + + $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0" + touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo" + + echo "Inject failure stub on MDT0 to simulate the case that foo's" + echo "nlink attribute is larger than its name entries count." + + #define OBD_FAIL_LFSCK_MORE_NLINK 0x1625 + do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1625 + ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 || + error "(3) Fail to hard link to $DIR/$tdir/d0/foo" + do_facet $SINGLEMDS $LCTL set_param fail_loc=0 + + cancel_lru_locks mdc + local count=$(stat --format=%h $DIR/$tdir/d0/foo) + [ $count -eq 3 ] || error "(4) Cannot inject error: $count" + + echo "Trigger namespace LFSCK to repair the nlink count" + $START_NAMESPACE -r -A || + error "(5) Fail to start LFSCK for namespace" + + wait_update_facet $SINGLEMDS "$LCTL get_param -n \ + mdd.${MDT_DEV}.lfsck_namespace | + awk '/^status/ { print \\\$2 }'" "completed" 32 || { + $SHOW_NAMESPACE + error "(6) unexpected status" + } + + local repaired=$($SHOW_NAMESPACE | + awk '/^nlinks_repaired/ { print $2 }') + [ $repaired -eq 1 ] || + error "(7) Fail to repair nlink count: $repaired" + + cancel_lru_locks mdc + count=$(stat --format=%h $DIR/$tdir/d0/foo) + [ $count -eq 2 ] || error "(8) Fail to repair nlink count: $count" +} +run_test 29a "LFSCK can repair bad nlink count (1)" + +test_29b() { + echo "#####" + echo "The object's nlink attribute is smaller than the object's known" + echo "name entries count. The LFSCK will repair the object's nlink" + echo "attribute to match the known name entries count" + echo "#####" + + check_mount_and_prep + + $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0" + touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo" + + echo "Inject failure stub on MDT0 to simulate the case that foo's" + echo "nlink attribute is smaller than its name entries count." + + #define OBD_FAIL_LFSCK_LESS_NLINK 0x1626 + do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1626 + ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 || + error "(3) Fail to hard link to $DIR/$tdir/d0/foo" + do_facet $SINGLEMDS $LCTL set_param fail_loc=0 + + cancel_lru_locks mdc + local count=$(stat --format=%h $DIR/$tdir/d0/foo) + [ $count -eq 1 ] || error "(4) Cannot inject error: $count" + + echo "Trigger namespace LFSCK to repair the nlink count" + $START_NAMESPACE -r -A || + error "(5) Fail to start LFSCK for namespace" + + wait_update_facet $SINGLEMDS "$LCTL get_param -n \ + mdd.${MDT_DEV}.lfsck_namespace | + awk '/^status/ { print \\\$2 }'" "completed" 32 || { + $SHOW_NAMESPACE + error "(6) unexpected status" + } + + local repaired=$($SHOW_NAMESPACE | + awk '/^nlinks_repaired/ { print $2 }') + [ $repaired -eq 1 ] || + error "(7) Fail to repair nlink count: $repaired" + + cancel_lru_locks mdc + count=$(stat --format=%h $DIR/$tdir/d0/foo) + [ $count -eq 2 ] || error "(8) Fail to repair nlink count: $count" +} +run_test 29b "LFSCK can repair bad nlink count (2)" + +test_29c() { + echo "#####" + echo "There are too much hard links to the object, and exceeds the + echo object's linkEA limitation, as to NOT all the known name entries" + echo "will be recorded in the linkEA. Under such case, LFSCK should" + echo "skip the nlink verification for this object." + echo "#####" + + check_mount_and_prep + + $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0" + touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo" + ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 || + error "(3) Fail to hard link to $DIR/$tdir/d0/foo" + + echo "Inject failure stub on MDT0 to simulate the case that" + echo "foo's hard links exceed the object's linkEA limitation." + + #define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627 + do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1627 + ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h2 || + error "(4) Fail to hard link to $DIR/$tdir/d0/foo" + + cancel_lru_locks mdc + + local count1=$(stat --format=%h $DIR/$tdir/d0/foo) + [ $count1 -eq 3 ] || error "(5) Stat failure: $count1" + + local foofid=$($LFS path2fid $DIR/$tdir/d0/foo) + $LFS fid2path $DIR $foofid + local count2=$($LFS fid2path $DIR $foofid | wc -l) + [ $count2 -eq 2 ] || "(6) Fail to inject error: $count2" + + echo "Trigger namespace LFSCK to repair the nlink count" + $START_NAMESPACE -r -A || + error "(7) Fail to start LFSCK for namespace" + + wait_update_facet $SINGLEMDS "$LCTL get_param -n \ + mdd.${MDT_DEV}.lfsck_namespace | + awk '/^status/ { print \\\$2 }'" "completed" 32 || { + $SHOW_NAMESPACE + error "(8) unexpected status" + } + + do_facet $SINGLEMDS $LCTL set_param fail_loc=0 + local repaired=$($SHOW_NAMESPACE | + awk '/^nlinks_repaired/ { print $2 }') + [ $repaired -eq 0 ] || + error "(9) Repair nlink count unexpcetedly: $repaired" + + cancel_lru_locks mdc + + count1=$(stat --format=%h $DIR/$tdir/d0/foo) + [ $count1 -eq 3 ] || error "(10) Stat failure: $count1" + + count2=$($LFS fid2path $DIR $foofid | wc -l) + [ $count2 -eq 2 ] || + error "(11) Repaired something unexpectedly: $count2" +} +run_test 29c "Not verify nlink attr if hark links exceed linkEA limitation" + $LCTL set_param debug=-lfsck > /dev/null || true # restore MDS/OST size diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 0ea8c10..5e6f93d 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -2149,6 +2149,8 @@ static void check_lfsck_request(void) CHECK_VALUE(LE_CONDITIONAL_DESTROY); CHECK_VALUE(LE_PAIRS_VERIFY); CHECK_VALUE(LE_CREATE_ORPHAN); + CHECK_VALUE(LE_SKIP_NLINK_DECLARE); + CHECK_VALUE(LE_SKIP_NLINK); CHECK_VALUE_X(LEF_TO_OST); CHECK_VALUE_X(LEF_FROM_OST); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 7c5e3bf..f08f453 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -4727,6 +4727,10 @@ void lustre_assert_wire_constants(void) (long long)LE_PAIRS_VERIFY); LASSERTF(LE_CREATE_ORPHAN == 12, "found %lld\n", (long long)LE_CREATE_ORPHAN); + LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n", + (long long)LE_SKIP_NLINK_DECLARE); + LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n", + (long long)LE_SKIP_NLINK); LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n", (unsigned)LEF_TO_OST); LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",