int tgt_hpreq_handler(struct ptlrpc_request *req);
void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *,
struct dt_device *,
- struct lfsck_request *));
+ struct lfsck_request *,
+ struct thandle *));
void tgt_register_lfsck_query(int (*query)(const struct lu_env *,
struct dt_device *,
struct lfsck_request *));
LE_CONDITIONAL_DESTROY = 10,
LE_PAIRS_VERIFY = 11,
LE_CREATE_ORPHAN = 12,
+ LE_SKIP_NLINK_DECLARE = 13,
+ LE_SKIP_NLINK = 14,
};
enum lfsck_event_flags {
int lfsck_stop(const struct lu_env *env, struct dt_device *key,
struct lfsck_stop *stop);
int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
- struct lfsck_request *lr);
+ struct lfsck_request *lr, struct thandle *th);
int lfsck_query(const struct lu_env *env, struct dt_device *key,
struct lfsck_request *lr);
int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type);
static inline void lfsck_pack_rfa(struct lfsck_request *lr,
- const struct lu_fid *fid)
+ const struct lu_fid *fid,
+ __u32 event, __u16 com)
{
memset(lr, 0, sizeof(*lr));
- lr->lr_event = LE_FID_ACCESSED;
- lr->lr_active = LFSCK_TYPE_LAYOUT;
lr->lr_fid = *fid;
+ lr->lr_event = event;
+ lr->lr_active = com;
}
#endif /* _LUSTRE_LFSCK_H */
#define OBD_FAIL_LFSCK_MUL_REF 0x1622
#define OBD_FAIL_LFSCK_BAD_TYPE 0x1623
#define OBD_FAIL_LFSCK_NO_NAMEENTRY 0x1624
+#define OBD_FAIL_LFSCK_MORE_NLINK 0x1625
+#define OBD_FAIL_LFSCK_LESS_NLINK 0x1626
+#define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627
#define OBD_FAIL_LFSCK_NOTIFY_NET 0x16f0
#define OBD_FAIL_LFSCK_QUERY_NET 0x16f1
enum lfsck_namespace_trace_flags {
LNTF_CHECK_LINKEA = 0x01,
LNTF_CHECK_PARENT = 0x02,
+ LNTF_SKIP_NLINK = 0x04,
LNTF_ALL = 0xff
};
int (*lfsck_in_notify)(const struct lu_env *env,
struct lfsck_component *com,
- struct lfsck_request *lr);
+ struct lfsck_request *lr,
+ struct thandle *th);
int (*lfsck_query)(const struct lu_env *env,
struct lfsck_component *com);
LASSERT(com->lc_lfsck->li_master);
LASSERT(bitmap != NULL);
- LASSERTF(bitmap->size > index, "invalid index: nbits %d, index %u\n",
- bitmap->size, index);
- cfs_bitmap_set(bitmap, index);
- lad->lad_incomplete = 1;
+ if (likely(bitmap->size > index)) {
+ cfs_bitmap_set(bitmap, index);
+ lad->lad_incomplete = 1;
+ } else if (com->lc_type == LFSCK_TYPE_NAMESPACE) {
+ struct lfsck_namespace *ns = com->lc_file_ram;
+
+ ns->ln_flags |= LF_INCOMPLETE;
+ }
}
static inline int lfsck_links_read(const struct lu_env *env,
* \param[in] env pointer to the thread context
* \param[in] com pointer to the lfsck component
*
- * \retval positive number for data corruption
* \retval 0 for success
- * \retval negative error number on failure
+ * \retval negative error number on failure or data corruption
*/
static int lfsck_layout_load_bitmap(const struct lu_env *env,
struct lfsck_component *com)
size = (lo->ll_bitmap_size + 7) >> 3;
rc = dt_read(env, obj, lfsck_buf_get(env, bitmap->data, size), &pos);
- if (rc == 0) {
- RETURN(-ENOENT);
- } else if (rc != size) {
- CDEBUG(D_LFSCK, "%s: lfsck_layout bitmap size %u != %u\n",
- lfsck_lfsck2name(com->lc_lfsck),
- (unsigned int)size, rc);
-
- RETURN(rc);
- }
+ if (rc != size)
+ RETURN(rc >= 0 ? -EINVAL : rc);
if (cfs_bitmap_check_empty(bitmap))
lad->lad_incomplete = 0;
lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
lo->ll_success_count++;
} else if (rc == 0) {
- lo->ll_status = lfsck->li_status;
- if (lo->ll_status == 0)
+ if (lfsck->li_status != 0)
+ lo->ll_status = lfsck->li_status;
+ else
lo->ll_status = LS_STOPPED;
} else {
lo->ll_status = LS_FAILED;
* but does not know the position (the file name) in the
* layout.
*
+ * type "D": The MDT-object is a directory, it may knows its parent
+ * but because there is no valid linkEA, the LFSCK cannot
+ * know where to put it back to the namespace.
+ * type "O": The MDT-object has no linkEA, and there is no name
+ * entry that references the MDT-object.
+ *
+ * type "P": The orphan object to be created was a parent directory
+ * of some MDT-object which linkEA shows that the @orphan
+ * object is missing.
+ *
* The orphan name will be like:
* ${FID}-${infix}-${type}-${conflict_version}
*
/* layout APIs */
+static void lfsck_layout_slave_quit(const struct lu_env *env,
+ struct lfsck_component *com);
+
static int lfsck_layout_reset(const struct lu_env *env,
struct lfsck_component *com, bool init)
{
ENTRY;
rc = lfsck_layout_load_bitmap(env, com);
- if (rc > 0) {
+ if (rc != 0) {
rc = lfsck_layout_reset(env, com, false);
if (rc == 0)
rc = lfsck_set_param(env, com->lc_lfsck,
lsp->lsp_start, true);
- }
- if (rc != 0)
- GOTO(log, rc);
+ if (rc != 0)
+ GOTO(log, rc);
+ }
rc = lfsck_layout_prep(env, com, lsp->lsp_start);
if (rc != 0)
log:
CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
- LPU64"\n", lfsck_lfsck2name(com->lc_lfsck),
+ LPU64"]\n", lfsck_lfsck2name(com->lc_lfsck),
com->lc_pos_start.lp_oit_cookie);
return 0;
lo->ll_flags &= ~LF_UPGRADE;
list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
} else if (result == 0) {
- lo->ll_status = lfsck->li_status;
- if (lo->ll_status == 0)
+ if (lfsck->li_status != 0)
+ lo->ll_status = lfsck->li_status;
+ else
lo->ll_status = LS_STOPPED;
- if (lo->ll_status != LS_PAUSED) {
+ if (lo->ll_status != LS_PAUSED)
list_move_tail(&com->lc_link, &lfsck->li_list_idle);
- }
} else {
lo->ll_status = LS_FAILED;
list_move_tail(&com->lc_link, &lfsck->li_list_idle);
lfsck->li_pos_checkpoint.lp_oit_cookie;
if (result > 0) {
- if (lo->ll_flags & LF_INCOMPLETE)
- lo->ll_status = LS_PARTIAL;
- else
- lo->ll_status = LS_SCANNING_PHASE2;
+ lo->ll_status = LS_SCANNING_PHASE2;
lo->ll_flags |= LF_SCANNED_ONCE;
if (lo->ll_flags & LF_CRASHED_LASTID) {
done = true;
lo->ll_flags &= ~LF_UPGRADE;
list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
} else if (result == 0) {
- lo->ll_status = lfsck->li_status;
- if (lo->ll_status == 0)
+ if (lfsck->li_status != 0)
+ lo->ll_status = lfsck->li_status;
+ else
lo->ll_status = LS_STOPPED;
if (lo->ll_status != LS_PAUSED)
list_move_tail(&com->lc_link, &lfsck->li_list_idle);
lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
- if (result <= 0)
- lfsck_rbtree_cleanup(env, com);
-
CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
lfsck_lfsck2name(lfsck), rc);
static int lfsck_layout_master_double_scan(const struct lu_env *env,
struct lfsck_component *com)
{
- struct lfsck_layout *lo = com->lc_file_ram;
+ struct lfsck_layout *lo = com->lc_file_ram;
+ struct lfsck_assistant_data *lad = com->lc_data;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct lfsck_tgt_descs *ltds;
+ struct lfsck_tgt_desc *ltd;
+ struct lfsck_tgt_desc *next;
+ int rc;
+
+ rc = lfsck_double_scan_generic(env, com, lo->ll_status);
+
+ if (thread_is_stopped(&lad->lad_thread)) {
+ LASSERT(list_empty(&lad->lad_req_list));
+ LASSERT(list_empty(&lad->lad_ost_phase1_list));
+ LASSERT(list_empty(&lad->lad_mdt_phase1_list));
+
+ ltds = &lfsck->li_ost_descs;
+ spin_lock(<ds->ltd_lock);
+ list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
+ ltd_layout_phase_list) {
+ list_del_init(<d->ltd_layout_phase_list);
+ }
+ spin_unlock(<ds->ltd_lock);
+
+ ltds = &lfsck->li_mdt_descs;
+ spin_lock(<ds->ltd_lock);
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+ ltd_layout_phase_list) {
+ list_del_init(<d->ltd_layout_phase_list);
+ }
+ spin_unlock(<ds->ltd_lock);
+ }
- return lfsck_double_scan_generic(env, com, lo->ll_status);
+ return rc;
}
static int lfsck_layout_slave_double_scan(const struct lu_env *env,
int rc;
ENTRY;
- if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
- lfsck_rbtree_cleanup(env, com);
- lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
- RETURN(0);
- }
-
CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
lfsck_lfsck2name(lfsck));
+ if (lo->ll_flags & LF_INCOMPLETE)
+ GOTO(done, rc = 1);
+
atomic_inc(&lfsck->li_double_scan_count);
com->lc_new_checked = 0;
done:
rc = lfsck_layout_double_scan_result(env, com, rc);
-
- lfsck_rbtree_cleanup(env, com);
- lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
+ lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE,
+ (rc > 0 && lo->ll_flags & LF_INCOMPLETE) ? 0 : rc);
+ lfsck_layout_slave_quit(env, com);
if (atomic_dec_and_test(&lfsck->li_double_scan_count))
wake_up_all(&lfsck->li_thread.t_ctl_waitq);
}
spin_unlock(<ds->ltd_lock);
- CFS_FREE_BITMAP(lad->lad_bitmap);
+ if (likely(lad->lad_bitmap != NULL))
+ CFS_FREE_BITMAP(lad->lad_bitmap);
OBD_FREE_PTR(lad);
}
static void lfsck_layout_slave_data_release(const struct lu_env *env,
struct lfsck_component *com)
{
+ struct lfsck_layout_slave_data *llsd = com->lc_data;
+
+ lfsck_layout_slave_quit(env, com);
+ com->lc_data = NULL;
+ OBD_FREE_PTR(llsd);
+}
+
+static void lfsck_layout_master_quit(const struct lu_env *env,
+ struct lfsck_component *com)
+{
+ struct lfsck_assistant_data *lad = com->lc_data;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct lfsck_tgt_descs *ltds;
+ struct lfsck_tgt_desc *ltd;
+ struct lfsck_tgt_desc *next;
+
+ LASSERT(lad != NULL);
+
+ lfsck_quit_generic(env, com);
+
+ LASSERT(thread_is_init(&lad->lad_thread) ||
+ thread_is_stopped(&lad->lad_thread));
+ LASSERT(list_empty(&lad->lad_req_list));
+
+ ltds = &lfsck->li_ost_descs;
+ spin_lock(<ds->ltd_lock);
+ list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list,
+ ltd_layout_phase_list) {
+ list_del_init(<d->ltd_layout_phase_list);
+ }
+ list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
+ ltd_layout_phase_list) {
+ list_del_init(<d->ltd_layout_phase_list);
+ }
+ spin_unlock(<ds->ltd_lock);
+
+ ltds = &lfsck->li_mdt_descs;
+ spin_lock(<ds->ltd_lock);
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
+ ltd_layout_phase_list) {
+ list_del_init(<d->ltd_layout_phase_list);
+ }
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+ ltd_layout_phase_list) {
+ list_del_init(<d->ltd_layout_phase_list);
+ }
+ spin_unlock(<ds->ltd_lock);
+}
+
+static void lfsck_layout_slave_quit(const struct lu_env *env,
+ struct lfsck_component *com)
+{
struct lfsck_layout_slave_data *llsd = com->lc_data;
struct lfsck_layout_seq *lls;
struct lfsck_layout_seq *next;
struct lfsck_layout_slave_target *llst;
- struct lfsck_layout_slave_target *tmp;
LASSERT(llsd != NULL);
list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
- lls_list) {
+ lls_list) {
list_del_init(&lls->lls_list);
lfsck_object_put(env, lls->lls_lastid_obj);
OBD_FREE_PTR(lls);
}
- list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
- llst_list) {
+ spin_lock(&llsd->llsd_lock);
+ while (!list_empty(&llsd->llsd_master_list)) {
+ llst = list_entry(llsd->llsd_master_list.next,
+ struct lfsck_layout_slave_target, llst_list);
list_del_init(&llst->llst_list);
- OBD_FREE_PTR(llst);
+ spin_unlock(&llsd->llsd_lock);
+ lfsck_layout_llst_put(llst);
}
+ spin_unlock(&llsd->llsd_lock);
lfsck_rbtree_cleanup(env, com);
- com->lc_data = NULL;
- OBD_FREE_PTR(llsd);
-}
-
-static void lfsck_layout_slave_quit(const struct lu_env *env,
- struct lfsck_component *com)
-{
- lfsck_rbtree_cleanup(env, com);
}
static int lfsck_layout_master_in_notify(const struct lu_env *env,
struct lfsck_component *com,
- struct lfsck_request *lr)
+ struct lfsck_request *lr,
+ struct thandle *th)
{
struct lfsck_instance *lfsck = com->lc_lfsck;
struct lfsck_layout *lo = com->lc_file_ram;
}
CDEBUG(D_LFSCK, "%s: layout LFSCK master handles notify %u "
- "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
- lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
- lr->lr_index, lr->lr_status);
+ "from %s %x, status %d, flags %x, flags2 %x\n",
+ lfsck_lfsck2name(lfsck), lr->lr_event,
+ (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
+ lr->lr_index, lr->lr_status, lr->lr_flags, lr->lr_flags2);
if (lr->lr_event != LE_PHASE1_DONE &&
lr->lr_event != LE_PHASE2_DONE &&
static int lfsck_layout_slave_in_notify(const struct lu_env *env,
struct lfsck_component *com,
- struct lfsck_request *lr)
+ struct lfsck_request *lr,
+ struct thandle *th)
{
struct lfsck_instance *lfsck = com->lc_lfsck;
struct lfsck_layout_slave_data *llsd = com->lc_data;
.lfsck_dump = lfsck_layout_dump,
.lfsck_double_scan = lfsck_layout_master_double_scan,
.lfsck_data_release = lfsck_layout_master_data_release,
- .lfsck_quit = lfsck_quit_generic,
+ .lfsck_quit = lfsck_layout_master_quit,
.lfsck_in_notify = lfsck_layout_master_in_notify,
.lfsck_query = lfsck_layout_query,
};
EXPORT_SYMBOL(lfsck_stop);
int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
- struct lfsck_request *lr)
+ struct lfsck_request *lr, struct thandle *th)
{
int rc = -EOPNOTSUPP;
ENTRY;
case LE_PEER_EXIT:
case LE_CONDITIONAL_DESTROY:
case LE_CREATE_ORPHAN:
+ case LE_SKIP_NLINK_DECLARE:
+ case LE_SKIP_NLINK:
case LE_PAIRS_VERIFY: {
struct lfsck_instance *lfsck;
struct lfsck_component *com;
com = lfsck_component_find(lfsck, lr->lr_active);
if (likely(com != NULL)) {
- rc = com->lc_ops->lfsck_in_notify(env, com, lr);
+ rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
lfsck_component_put(env, com);
}
* \param[in] env pointer to the thread context
* \param[in] com pointer to the lfsck component
*
- * \retval positive number for data corruption
* \retval 0 for success
- * \retval negative error number on failure
+ * \retval negative error number on failure or data corruption
*/
static int lfsck_namespace_load_bitmap(const struct lu_env *env,
struct lfsck_component *com)
rc = dt_xattr_get(env, obj,
lfsck_buf_get(env, bitmap->data, size),
XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA);
- if (rc == -ERANGE || rc == -ENODATA || rc == 0)
- RETURN(1);
-
- if (rc < 0)
- RETURN(rc);
-
if (rc != size)
- RETURN(rc);
+ RETURN(rc >= 0 ? -EINVAL : rc);
if (cfs_bitmap_check_empty(bitmap))
lad->lad_incomplete = 0;
* \param[in] type the orphan's type to be created
*
* type "P": The orphan object to be created was a parent directory
- * of some DMT-object which linkEA shows that the @orphan
+ * of some MDT-object which linkEA shows that the @orphan
* object is missing.
*
* \see lfsck_layout_recreate_parent() for more types.
* \param[in] type the orphan's type to be created
*
* type "P": The orphan object to be created was a parent directory
- * of some DMT-object which linkEA shows that the @orphan
+ * of some MDT-object which linkEA shows that the @orphan
* object is missing.
*
* \see lfsck_layout_recreate_parent() for more types.
* \param[in] orphan pointer to the orphan MDT-object
*
* type "P": The orphan object to be created was a parent directory
- * of some DMT-object which linkEA shows that the @orphan
+ * of some MDT-object which linkEA shows that the @orphan
* object is missing.
*
* \see lfsck_layout_recreate_parent() for more types.
}
/**
+ * Repair the object's nlink attribute.
+ *
+ * If all the known name entries have been verified, then the object's hard
+ * link attribute should match the object's linkEA entries count unless the
+ * object's has too much hard link to be recorded in the linkEA. Such cases
+ * should have been marked in the LFSCK tracing file. Otherwise, trust the
+ * linkEA to update the object's nlink attribute.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] obj pointer to the dt_object to be handled
+ * \param[in,out] nlink pointer to buffer to object's hard lock count before
+ * and after the repairing
+ *
+ * \retval positive number for repaired cases
+ * \retval 0 if nothing to be repaired
+ * \retval negative error number on failure
+ */
+static int lfsck_namespace_repair_nlink(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *obj, __u32 *nlink)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_attr *la = &info->lti_la3;
+ struct lu_fid *tfid = &info->lti_fid3;
+ struct lfsck_namespace *ns = com->lc_file_ram;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct dt_device *dev = lfsck->li_bottom;
+ const struct lu_fid *cfid = lfsck_dto2fid(obj);
+ struct dt_object *child = NULL;
+ struct thandle *th = NULL;
+ struct linkea_data ldata = { 0 };
+ struct lustre_handle lh = { 0 };
+ __u32 old = *nlink;
+ int rc = 0;
+ __u8 flags;
+ ENTRY;
+
+ LASSERT(!dt_object_remote(obj));
+ LASSERT(S_ISREG(lfsck_object_type(obj)));
+
+ child = lfsck_object_find_by_dev(env, dev, cfid);
+ if (IS_ERR(child))
+ GOTO(log, rc = PTR_ERR(child));
+
+ rc = lfsck_ibits_lock(env, lfsck, child, &lh,
+ MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_XATTR, LCK_EX);
+ if (rc != 0)
+ GOTO(log, rc);
+
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(log, rc = PTR_ERR(th));
+
+ la->la_valid = LA_NLINK;
+ rc = dt_declare_attr_set(env, child, la, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, child, 0);
+ /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
+ * ever tried to verify some remote MDT-object that resides on this
+ * MDT, but this MDT failed to respond such request. So means there
+ * may be some remote name entry on other MDT that references this
+ * object with another name, so we cannot know whether this linkEA
+ * is valid or not. So keep it there and maybe resolved when next
+ * LFSCK run. */
+ if (ns->ln_flags & LF_INCOMPLETE)
+ GOTO(unlock, rc = 0);
+
+ fid_cpu_to_be(tfid, cfid);
+ rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags,
+ (const struct dt_key *)tfid, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ if (flags & LNTF_SKIP_NLINK)
+ GOTO(unlock, rc = 0);
+
+ rc = lfsck_links_read2(env, child, &ldata);
+ if (rc == -ENODATA)
+ GOTO(unlock, rc = 0);
+
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ if (*nlink == ldata.ld_leh->leh_reccount)
+ GOTO(unlock, rc = 0);
+
+ la->la_nlink = *nlink = ldata.ld_leh->leh_reccount;
+ if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+ GOTO(unlock, rc = 1);
+
+ rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
+
+ GOTO(unlock, rc = (rc == 0 ? 1 : rc));
+
+unlock:
+ dt_write_unlock(env, child);
+
+stop:
+ dt_trans_stop(env, dev, th);
+
+log:
+ lfsck_ibits_unlock(&lh, LCK_EX);
+ if (child != NULL && !IS_ERR(child))
+ lfsck_object_put(env, child);
+
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
+ "nlink count from %u to %u: rc = %d\n",
+ lfsck_lfsck2name(lfsck), PFID(cfid), old, *nlink, rc);
+
+ if (rc != 0)
+ ns->ln_flags |= LF_INCONSISTENT;
+
+ return rc;
+}
+
+/**
* Double scan the directory object for namespace LFSCK.
*
* This function will verify the <parent, child> pairs in the namespace tree:
return rc;
if (la->la_nlink != count) {
- /* XXX: there will be other patch(es) for MDT-object
- * hard links verification. */
+ rc = lfsck_namespace_repair_nlink(env, com, child,
+ &la->la_nlink);
+ if (rc > 0) {
+ ns->ln_objs_nlink_repaired++;
+ rc = 0;
+ }
}
if (repaired) {
int rc;
rc = lfsck_namespace_load_bitmap(env, com);
- if (rc > 0 || (rc == 0 && ns->ln_status == LS_COMPLETED)) {
+ if (rc != 0 || ns->ln_status == LS_COMPLETED) {
rc = lfsck_namespace_reset(env, com, false);
if (rc == 0)
rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
- }
- if (rc != 0) {
- CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: rc = %d\n",
- lfsck_lfsck2name(lfsck), rc);
+ if (rc != 0) {
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
+ "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
- return rc;
+ return rc;
+ }
}
down_write(&com->lc_sem);
list_del_init(&com->lc_link_dir);
list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
} else if (result == 0) {
- ns->ln_status = lfsck->li_status;
- if (ns->ln_status == 0)
+ if (lfsck->li_status != 0)
+ ns->ln_status = lfsck->li_status;
+ else
ns->ln_status = LS_STOPPED;
if (ns->ln_status != LS_PAUSED) {
list_del_init(&com->lc_link_dir);
static int lfsck_namespace_double_scan(const struct lu_env *env,
struct lfsck_component *com)
{
- struct lfsck_namespace *ns = com->lc_file_ram;
+ struct lfsck_namespace *ns = com->lc_file_ram;
+ struct lfsck_assistant_data *lad = com->lc_data;
+ struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
+ struct lfsck_tgt_desc *ltd;
+ struct lfsck_tgt_desc *next;
+ int rc;
+
+ rc = lfsck_double_scan_generic(env, com, ns->ln_status);
+ if (thread_is_stopped(&lad->lad_thread)) {
+ LASSERT(list_empty(&lad->lad_req_list));
+ LASSERT(list_empty(&lad->lad_mdt_phase1_list));
- return lfsck_double_scan_generic(env, com, ns->ln_status);
+ spin_lock(<ds->ltd_lock);
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+ ltd_namespace_phase_list) {
+ list_del_init(<d->ltd_namespace_phase_list);
+ }
+ spin_unlock(<ds->ltd_lock);
+ }
+
+ return rc;
}
static void lfsck_namespace_data_release(const struct lu_env *env,
}
spin_unlock(<ds->ltd_lock);
- CFS_FREE_BITMAP(lad->lad_bitmap);
+ if (likely(lad->lad_bitmap != NULL))
+ CFS_FREE_BITMAP(lad->lad_bitmap);
OBD_FREE_PTR(lad);
}
+static void lfsck_namespace_quit(const struct lu_env *env,
+ struct lfsck_component *com)
+{
+ struct lfsck_assistant_data *lad = com->lc_data;
+ struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
+ struct lfsck_tgt_desc *ltd;
+ struct lfsck_tgt_desc *next;
+
+ LASSERT(lad != NULL);
+
+ lfsck_quit_generic(env, com);
+
+ LASSERT(thread_is_init(&lad->lad_thread) ||
+ thread_is_stopped(&lad->lad_thread));
+ LASSERT(list_empty(&lad->lad_req_list));
+
+ spin_lock(<ds->ltd_lock);
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
+ ltd_namespace_phase_list) {
+ list_del_init(<d->ltd_namespace_phase_list);
+ }
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+ ltd_namespace_phase_list) {
+ list_del_init(<d->ltd_namespace_phase_list);
+ }
+ spin_unlock(<ds->ltd_lock);
+}
+
static int lfsck_namespace_in_notify(const struct lu_env *env,
struct lfsck_component *com,
- struct lfsck_request *lr)
+ struct lfsck_request *lr,
+ struct thandle *th)
{
struct lfsck_instance *lfsck = com->lc_lfsck;
struct lfsck_namespace *ns = com->lc_file_ram;
return rc;
}
+ case LE_SKIP_NLINK_DECLARE: {
+ struct dt_object *obj = com->lc_obj;
+ struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
+ __u8 flags = 0;
+
+ LASSERT(th != NULL);
+
+ rc = dt_declare_delete(env, obj,
+ (const struct dt_key *)key, th);
+ if (rc == 0)
+ rc = dt_declare_insert(env, obj,
+ (const struct dt_rec *)&flags,
+ (const struct dt_key *)key, th);
+
+ RETURN(rc);
+ }
+ case LE_SKIP_NLINK: {
+ struct dt_object *obj = com->lc_obj;
+ struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
+ __u8 flags = 0;
+ bool exist = false;
+ ENTRY;
+
+ LASSERT(th != NULL);
+
+ fid_cpu_to_be(key, &lr->lr_fid);
+ rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
+ (const struct dt_key *)key, BYPASS_CAPA);
+ if (rc == 0) {
+ if (flags & LNTF_SKIP_NLINK)
+ RETURN(0);
+
+ exist = true;
+ } else if (rc != -ENOENT) {
+ GOTO(log, rc);
+ }
+
+ flags |= LNTF_SKIP_NLINK;
+ if (exist) {
+ rc = dt_delete(env, obj, (const struct dt_key *)key,
+ th, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(log, rc);
+ }
+
+ rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
+ (const struct dt_key *)key, th, BYPASS_CAPA, 1);
+
+ GOTO(log, rc);
+
+log:
+ CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
+ " to be skipped for namespace double scan: rc = %d\n",
+ lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
+
+ if (rc != 0)
+ /* If we cannot record this object in the LFSCK tracing,
+ * we have to mark the LFSC as LF_INCOMPLETE, then the
+ * LFSCK will skip nlink attribute verification for
+ * all objects. */
+ ns->ln_flags |= LF_INCOMPLETE;
+
+ return 0;
+ }
case LE_PHASE1_DONE:
case LE_PHASE2_DONE:
case LE_PEER_EXIT:
.lfsck_dump = lfsck_namespace_dump,
.lfsck_double_scan = lfsck_namespace_double_scan,
.lfsck_data_release = lfsck_namespace_data_release,
- .lfsck_quit = lfsck_quit_generic,
+ .lfsck_quit = lfsck_namespace_quit,
.lfsck_in_notify = lfsck_namespace_in_notify,
.lfsck_query = lfsck_namespace_query,
};
GOTO(stop, rc);
rc = lfsck_links_write(env, obj, &ldata, handle);
+ if (unlikely(rc == -ENOSPC) &&
+ S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) {
+ if (handle != NULL) {
+ LASSERT(dt_write_locked(env, obj));
+
+ dt_write_unlock(env, obj);
+ dtlocked = false;
+
+ dt_trans_stop(env, dev, handle);
+ handle = NULL;
+
+ lfsck_ibits_unlock(&lh, LCK_EX);
+ }
+
+ rc = lfsck_namespace_trace_update(env, com,
+ &lnr->lnr_fid, LNTF_SKIP_NLINK, true);
+ if (rc != 0)
+ /* If we cannot record this object in the
+ * LFSCK tracing, we have to mark the LFSCK
+ * as LF_INCOMPLETE, then the LFSCK will
+ * skip nlink attribute verification for
+ * all objects. */
+ ns->ln_flags |= LF_INCOMPLETE;
+
+ GOTO(out, rc = 0);
+ }
+
if (rc != 0)
GOTO(stop, rc);
ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
ns->ln_success_count++;
} else if (rc == 0) {
- ns->ln_status = lfsck->li_status;
- if (ns->ln_status == 0)
+ if (lfsck->li_status != 0)
+ ns->ln_status = lfsck->li_status;
+ else
ns->ln_status = LS_STOPPED;
} else {
ns->ln_status = LS_FAILED;
if (rc != 0)
CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
"to sync failure with MDTs, and related MDTs "
- "may handle orphan un-properly: rc = %d\n",
+ "may handle orphan improperly: rc = %d\n",
lfsck_lfsck2name(lfsck), rc);
EXIT;
if (IS_ERR(th))
RETURN(PTR_ERR(th));
- rc = mdd_declare_links_add(env, o, th, NULL);
+ rc = mdd_declare_links_add(env, o, th, NULL, MLAO_IGNORE);
if (rc)
GOTO(out, rc);
rc = dt_trans_start_local(env, mdd->mdd_child, th);
{
const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
ldata->ld_leh->leh_len);
+ int rc;
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
return 0;
- return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
- mdd_object_capa(env, mdd_obj));
+ rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
+ mdd_object_capa(env, mdd_obj));
+ if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
+ mdd_object_remote(mdd_obj) == 0) {
+ struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
+
+ /* XXX: If the linkEA is overflow, then we need to notify the
+ * namespace LFSCK to skip "nlink" attribute verification
+ * on this object to avoid the "nlink" to be shrinked by
+ * wrong. It may be not good an interaction with LFSCK
+ * like this. We will consider to replace it with other
+ * mechanism in future. LU-5802. */
+ lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
+ LFSCK_TYPE_NAMESPACE);
+ lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
+ lr, handle);
+ }
+
+ return rc;
}
int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct thandle *handle, struct linkea_data *ldata)
+ struct thandle *handle, struct linkea_data *ldata,
+ enum mdd_links_add_overflow overflow)
{
int rc;
int ea_len;
rc = mdo_declare_xattr_set(env, mdd_obj,
mdd_buf_get_const(env, linkea, ea_len),
XATTR_NAME_LINK, 0, handle);
+ if (rc != 0)
+ return rc;
+
+ if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
+ struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
+
+ /* XXX: If the linkEA is overflow, then we need to notify the
+ * namespace LFSCK to skip "nlink" attribute verification
+ * on this object to avoid the "nlink" to be shrinked by
+ * wrong. It may be not good an interaction with LFSCK
+ * like this. We will consider to replace it with other
+ * mechanism in future. LU-5802. */
+ lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
+ LFSCK_TYPE_NAMESPACE);
+ rc = lfsck_in_notify(env,
+ mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
+ lr, handle);
+ }
+
return rc;
}
/* For directory, the linkEA will be removed together
* with the object. */
if (!S_ISDIR(mdd_object_type(c)))
- rc = mdd_declare_links_add(env, c, handle, NULL);
+ rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
return rc;
}
return rc;
rc = mdo_declare_ref_add(env, c, handle);
- if (rc)
+ if (rc != 0)
return rc;
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
+ rc = mdo_declare_ref_add(env, c, handle);
+ if (rc != 0)
+ return rc;
+ }
+
la->la_valid = LA_CTIME | LA_MTIME;
rc = mdo_declare_attr_set(env, p, la, handle);
if (rc != 0)
la->la_valid = LA_CTIME;
rc = mdo_declare_attr_set(env, c, la, handle);
- if (rc)
+ if (rc != 0)
return rc;
- rc = mdd_declare_links_add(env, c, handle, data);
- if (rc)
+ rc = mdd_declare_links_add(env, c, handle, data,
+ S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
+ if (rc != 0)
return rc;
rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
if (rc)
GOTO(out_unlock, rc);
- rc = mdo_ref_add(env, mdd_sobj, handle);
- if (rc)
- GOTO(out_unlock, rc);
+ if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
+ rc = mdo_ref_add(env, mdd_sobj, handle);
+ if (rc != 0)
+ GOTO(out_unlock, rc);
+ }
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
+ rc = mdo_ref_add(env, mdd_sobj, handle);
+ if (rc != 0)
+ GOTO(out_unlock, rc);
+ }
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) {
struct lu_fid tfid = *mdo2fid(mdd_sobj);
if (rc != 0)
return rc;
- rc = mdd_declare_links_add(env, c, handle, ldata);
+ rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
if (rc)
return rc;
if (rc)
return rc;
- rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
+ rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
+ S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
if (rc)
return rc;
linkea_entry_pack(ldata.ld_lee, &lname,
mdd_object_fid(parent));
if (declare)
- rc = mdd_declare_links_add(env, child, handle, &ldata);
+ rc = mdd_declare_links_add(env, child, handle, &ldata,
+ MLAO_IGNORE);
else
rc = mdd_links_write(env, child, &ldata, handle);
break;
}
if (declare)
- rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
+ rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
+ MLAO_IGNORE);
else
rc = mdd_links_write(env, mdd_tobj, ldata, handle);
struct linkea_data mti_link_data;
struct md_op_spec mti_spec;
struct dt_insert_rec mti_dt_rec;
+ struct lfsck_request mti_lr;
+};
+
+enum mdd_links_add_overflow {
+ MLAO_IGNORE = false,
+ MLAO_CHECK = true,
};
extern const char orph_index_name[];
int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
struct linkea_data *ldata);
int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct thandle *handle, struct linkea_data *ldata);
+ struct thandle *handle, struct linkea_data *ldata,
+ enum mdd_links_add_overflow overflow);
int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
struct linkea_data *ldata, struct thandle *handle);
struct lu_buf *mdd_links_get(const struct lu_env *env,
lr->lr_fid2 = oii->oii_pfid; /* client given PFID. */
lr->lr_fid3 = *pfid; /* OST local stored PFID. */
- rc = lfsck_in_notify(env, ofd->ofd_osd, lr);
+ rc = lfsck_in_notify(env, ofd->ofd_osd, lr, NULL);
ofd_write_lock(env, fo);
switch (lr->lr_status) {
case LPVS_INIT:
/* Only the new created objects need to be recorded. */
if (ofd->ofd_osd->dd_record_fid_accessed) {
- lfsck_pack_rfa(&ofd_info(env)->fti_lr,
- lu_object_fid(&fo->ofo_obj.do_lu));
- lfsck_in_notify(env, ofd->ofd_osd,
- &ofd_info(env)->fti_lr);
+ struct lfsck_request *lr = &ofd_info(env)->fti_lr;
+
+ lfsck_pack_rfa(lr, lu_object_fid(&fo->ofo_obj.do_lu),
+ LE_FID_ACCESSED,
+ LFSCK_TYPE_LAYOUT);
+ lfsck_in_notify(env, ofd->ofd_osd, lr, NULL);
}
if (likely(!ofd_object_exists(fo) &&
RETURN(rc);
}
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
+ strcmp(name, XATTR_NAME_LINK) == 0)
+ return -ENOSPC;
+
return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
fs_flags);
}
strcmp(name, POSIX_ACL_XATTR_DEFAULT) == 0))
RETURN(-EOPNOTSUPP);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
+ strcmp(name, XATTR_NAME_LINK) == 0)
+ RETURN(-ENOSPC);
+
oh = container_of0(handle, struct osd_thandle, ot_super);
down(&obj->oo_guard);
int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *attr, struct dt_allocation_hint *hint,
struct dt_object_format *dof, struct thandle *th);
-int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
- const struct lu_attr *attr, struct thandle *th);
-int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
- const struct lu_attr *attr, struct thandle *th,
- struct lustre_capa *capa);
+int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_attr *attr, struct thandle *th);
extern const struct dt_index_operations osp_md_index_ops;
/* osp_precreate.c */
}
/**
- * Implementation of dt_object_operations::do_declare_attr_get
- *
- * Declare setting attributes of the remote object, i.e. insert remote
- * object attr_set update into RPC.
+ * Add attr_set sub-request into the OUT RPC.
*
* \param[in] env execution environment
* \param[in] dt object on which to set attributes
* \retval 0 if the insertion succeeds.
* \retval negative errno if the insertion fails.
*/
-int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
- const struct lu_attr *attr, struct thandle *th)
+int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_attr *attr, struct thandle *th)
{
struct dt_update_request *update;
int rc;
}
/**
+ * Implementation of dt_object_operations::do_declare_attr_get
+ *
+ * Declare setting attributes to the specified remote object.
+ *
+ * If the transaction is a remote transaction, then add the modification
+ * sub-request into the OUT RPC here, and such OUT RPC will be triggered
+ * when transaction start.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt object on which to set attributes
+ * \param[in] attr attributes to be set
+ * \param[in] th the transaction handle
+ *
+ * \retval 0 if the insertion succeeds.
+ * \retval negative errno if the insertion fails.
+ */
+int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_attr *attr, struct thandle *th)
+{
+ int rc = 0;
+
+ CDEBUG(D_INFO, "declare attr set object "DFID"\n",
+ PFID(&dt->do_lu.lo_header->loh_fid));
+
+ if (!is_only_remote_trans(th))
+ rc = __osp_md_attr_set(env, dt, attr, th);
+
+ return rc;
+}
+
+/**
* Implementation of dt_object_operations::do_attr_set
*
- * Do nothing in this method for now. In DNE phase I, remote updates
- * are actually executed during transaction start, i.e. object attributes
- * have already been set when calling this method.
+ * Set attributes to the specified remote object.
+ *
+ * If the transaction is a remote transaction, then related modification
+ * sub-request has been added in the declare phase and related OUT RPC
+ * has been triggered at transaction start. Otherwise, the modification
+ * sub-request will be added here, and related OUT RPC will be triggered
+ * when transaction stop.
*
* \param[in] env execution environment
* \param[in] dt object to set attributes
const struct lu_attr *attr, struct thandle *th,
struct lustre_capa *capa)
{
+ int rc = 0;
+
CDEBUG(D_INFO, "attr set object "DFID"\n",
PFID(&dt->do_lu.lo_header->loh_fid));
- RETURN(0);
+ if (is_only_remote_trans(th))
+ rc = __osp_md_attr_set(env, dt, attr, th);
+
+ RETURN(rc);
}
/**
RETURN(rc);
}
- if (o->opo_new)
- /* no need in logging for new objects being created */
- RETURN(0);
-
if (!(attr->la_valid & (LA_UID | LA_GID)))
RETURN(0);
- if (!is_only_remote_trans(th))
+ if (!is_only_remote_trans(th)) {
+ if (o->opo_new)
+ /* no need in logging for new objects being created */
+ RETURN(0);
+
/*
* track all UID/GID changes via llog
*/
rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
- else
+ } else {
/* It is for OST-object attr_set directly without updating
* local MDT-object attribute. It is usually used by LFSCK. */
- rc = osp_md_declare_attr_set(env, dt, attr, th);
+ rc = __osp_md_attr_set(env, dt, attr, th);
+ }
if (rc != 0 || o->opo_ooa == NULL)
RETURN(rc);
if (is_only_remote_trans(th)) {
rc = __osp_attr_set(env, dt, attr, th);
- if (rc != 0)
- RETURN(rc);
+ if (rc == 0 && o->opo_new)
+ o->opo_new = 0;
+
+ RETURN(rc);
}
/* we're interested in uid/gid changes only */
RETURN(0);
}
- if (!is_only_remote_trans(th))
- /*
- * once transaction is committed put proper command on
- * the queue going to our OST
- */
- rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
- /* XXX: send new uid/gid to OST ASAP? */
- else
- /* It is for OST-object attr_set directly without updating
- * local MDT-object attribute. It is usually used by LFSCK. */
- rc = osp_md_attr_set(env, dt, attr, th, capa);
+ rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
+ /* XXX: send new uid/gid to OST ASAP? */
RETURN(rc);
}
(long long)LE_PAIRS_VERIFY);
LASSERTF(LE_CREATE_ORPHAN == 12, "found %lld\n",
(long long)LE_CREATE_ORPHAN);
+ LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n",
+ (long long)LE_SKIP_NLINK_DECLARE);
+ LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
+ (long long)LE_SKIP_NLINK);
LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n",
(unsigned)LEF_TO_OST);
LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",
rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
arg->u.xattr_set.name, arg->u.xattr_set.flags,
th, NULL);
- dt_write_unlock(env, dt_obj);
/**
* Ignore errors if this is LINK EA
**/
- if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
+ if (unlikely(rc != 0 &&
+ strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) {
+ /* XXX: If the linkEA is overflow, then we need to notify the
+ * namespace LFSCK to skip "nlink" attribute verification
+ * on this object to avoid the "nlink" to be shrinked by
+ * wrong. It may be not good an interaction with LFSCK
+ * like this. We will consider to replace it with other
+ * mechanism in future. LU-5802. */
+ if (rc == -ENOSPC) {
+ struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
+
+ lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
+ LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE);
+ tgt_lfsck_in_notify(env,
+ tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th);
+ }
+
rc = 0;
+ }
+ dt_write_unlock(env, dt_obj);
+
out:
CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
if (rc != 0)
return rc;
+ if (strcmp(name, XATTR_NAME_LINK) == 0) {
+ struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
+
+ /* XXX: If the linkEA is overflow, then we need to notify the
+ * namespace LFSCK to skip "nlink" attribute verification
+ * on this object to avoid the "nlink" to be shrinked by
+ * wrong. It may be not good an interaction with LFSCK
+ * like this. We will consider to replace it with other
+ * mechanism in future. LU-5802. */
+ lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
+ LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE);
+ rc = tgt_lfsck_in_notify(env,
+ tgt_ses_info(env)->tsi_tgt->lut_bottom,
+ lr, ta->ta_handle);
+ if (rc != 0)
+ return rc;
+ }
+
arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
if (IS_ERR(arg))
return PTR_ERR(arg);
if (dt->dd_record_fid_accessed) {
lfsck_pack_rfa(&tti->tti_lr,
- lu_object_fid(&dt_obj->do_lu));
- tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
+ lu_object_fid(&dt_obj->do_lu),
+ LE_FID_ACCESSED,
+ LFSCK_TYPE_LAYOUT);
+ tgt_lfsck_in_notify(env, dt, &tti->tti_lr, NULL);
}
tti->tti_u.update.tti_dt_object = dt_obj;
int (*tgt_lfsck_in_notify)(const struct lu_env *env,
struct dt_device *key,
- struct lfsck_request *lr) = NULL;
+ struct lfsck_request *lr,
+ struct thandle *th) = NULL;
void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *,
struct dt_device *,
- struct lfsck_request *))
+ struct lfsck_request *,
+ struct thandle *))
{
tgt_lfsck_in_notify = notify;
}
if (lr == NULL)
RETURN(-EPROTO);
- rc = tgt_lfsck_in_notify(env, key, lr);
+ rc = tgt_lfsck_in_notify(env, key, lr, NULL);
RETURN(rc);
}
extern int (*tgt_lfsck_in_notify)(const struct lu_env *env,
struct dt_device *key,
- struct lfsck_request *lr);
+ struct lfsck_request *lr,
+ struct thandle *th);
struct tx_arg;
typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18 19 20 21"
[[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.6.50) ]] &&
- ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24 25 26 27 28"
+ ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24 25 26 27 28 29"
build_test_filter
check_mount_and_prep
$LFS mkdir -i 0 $DIR/$tdir/a1
- $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+ $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2
local saved_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }')
if [ $MDSCOUNT -ge 2 ]; then
$LFS mkdir -i 1 $DIR/$tdir/a2
- $LFS setstripe -c 2 -i 1 -s 1M $DIR/$tdir/a2
+ $LFS setstripe -c 2 -i 1 -S 1M $DIR/$tdir/a2
dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
$LFS path2fid $DIR/$tdir/a2/f2
$LFS getstripe $DIR/$tdir/a2/f2
check_mount_and_prep
$LFS mkdir -i 0 $DIR/$tdir/a1
- $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+ $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2
local saved_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }')
local fid1=$($LFS path2fid $DIR/$tdir/a1/f1)
if [ $MDSCOUNT -ge 2 ]; then
$LFS mkdir -i 1 $DIR/$tdir/a2
- $LFS setstripe -c 2 -i 1 -s 1M $DIR/$tdir/a2
+ $LFS setstripe -c 2 -i 1 -S 1M $DIR/$tdir/a2
dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
fid2=$($LFS path2fid $DIR/$tdir/a2/f2)
echo ${fid2}
check_mount_and_prep
$LFS mkdir -i 0 $DIR/$tdir/a1
- $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+ $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
echo "Inject failure, to simulate the case of missing parent FID"
#define OBD_FAIL_LFSCK_NOPFID 0x1617
if [ $MDSCOUNT -ge 2 ]; then
$LFS mkdir -i 1 $DIR/$tdir/a2
- $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a2
+ $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a2
dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
$LFS getstripe $DIR/$tdir/a2/f2
fi
check_mount_and_prep
mkdir $DIR/$tdir/a1
- $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+ $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
echo "guard" > $DIR/$tdir/a1/f1
echo "foo" > $DIR/$tdir/a1/f2
local saved_size=$(ls -il $DIR/$tdir/a1/f2 | awk '{ print $6 }')
check_mount_and_prep
mkdir $DIR/$tdir/a1
- $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+ $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
echo "guard" > $DIR/$tdir/a1/f1
echo "foo" > $DIR/$tdir/a1/f2
local saved_size=$(ls -il $DIR/$tdir/a1/f2 | awk '{ print $6 }')
check_mount_and_prep
$LFS mkdir -i 0 $DIR/$tdir/a1
- $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+ $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
dd if=/dev/zero of=$DIR/$tdir/a1/guard bs=1M count=2
dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2
$LFS mkdir -i 0 $DIR/$tdir/a2
- $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a2
+ $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a2
dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
$LFS getstripe $DIR/$tdir/a1/f1
$LFS getstripe $DIR/$tdir/a2/f2
if [ $MDSCOUNT -ge 2 ]; then
$LFS mkdir -i 1 $DIR/$tdir/a3
- $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a3
+ $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a3
dd if=/dev/zero of=$DIR/$tdir/a3/guard bs=1M count=2
dd if=/dev/zero of=$DIR/$tdir/a3/f3 bs=1M count=2
$LFS mkdir -i 1 $DIR/$tdir/a4
- $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a4
+ $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a4
dd if=/dev/zero of=$DIR/$tdir/a4/f4 bs=1M count=2
$LFS getstripe $DIR/$tdir/a3/f3
$LFS getstripe $DIR/$tdir/a4/f4
check_mount_and_prep
$LFS mkdir -i 0 $DIR/$tdir/a1
if [ $OSTCOUNT -gt 2 ]; then
- $LFS setstripe -c 3 -i 0 -s 1M $DIR/$tdir/a1
+ $LFS setstripe -c 3 -i 0 -S 1M $DIR/$tdir/a1
bcount=513
else
- $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a1
+ $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a1
bcount=257
fi
echo "The target name entry is lost. The LFSCK should insert the"
echo "orphan MDT-object under .lustre/lost+found/MDTxxxx. But if"
echo "the MDT (on which the orphan MDT-object resides) has ever"
- echo "failed to respond some name entry verification durin the"
+ echo "failed to respond some name entry verification during the"
echo "first stage-scanning, then the LFSCK should skip to handle"
echo "orphan MDT-object on this MDT. But other MDTs should not"
echo "be affected."
}
run_test 28 "Skip the failed MDT(s) when handle orphan MDT-objects"
+test_29a() {
+ echo "#####"
+ echo "The object's nlink attribute is larger than the object's known"
+ echo "name entries count. The LFSCK will repair the object's nlink"
+ echo "attribute to match the known name entries count"
+ echo "#####"
+
+ check_mount_and_prep
+
+ $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0"
+ touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo"
+
+ echo "Inject failure stub on MDT0 to simulate the case that foo's"
+ echo "nlink attribute is larger than its name entries count."
+
+ #define OBD_FAIL_LFSCK_MORE_NLINK 0x1625
+ do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1625
+ ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 ||
+ error "(3) Fail to hard link to $DIR/$tdir/d0/foo"
+ do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+
+ cancel_lru_locks mdc
+ local count=$(stat --format=%h $DIR/$tdir/d0/foo)
+ [ $count -eq 3 ] || error "(4) Cannot inject error: $count"
+
+ echo "Trigger namespace LFSCK to repair the nlink count"
+ $START_NAMESPACE -r -A ||
+ error "(5) Fail to start LFSCK for namespace"
+
+ wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+ mdd.${MDT_DEV}.lfsck_namespace |
+ awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+ $SHOW_NAMESPACE
+ error "(6) unexpected status"
+ }
+
+ local repaired=$($SHOW_NAMESPACE |
+ awk '/^nlinks_repaired/ { print $2 }')
+ [ $repaired -eq 1 ] ||
+ error "(7) Fail to repair nlink count: $repaired"
+
+ cancel_lru_locks mdc
+ count=$(stat --format=%h $DIR/$tdir/d0/foo)
+ [ $count -eq 2 ] || error "(8) Fail to repair nlink count: $count"
+}
+run_test 29a "LFSCK can repair bad nlink count (1)"
+
+test_29b() {
+ echo "#####"
+ echo "The object's nlink attribute is smaller than the object's known"
+ echo "name entries count. The LFSCK will repair the object's nlink"
+ echo "attribute to match the known name entries count"
+ echo "#####"
+
+ check_mount_and_prep
+
+ $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0"
+ touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo"
+
+ echo "Inject failure stub on MDT0 to simulate the case that foo's"
+ echo "nlink attribute is smaller than its name entries count."
+
+ #define OBD_FAIL_LFSCK_LESS_NLINK 0x1626
+ do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1626
+ ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 ||
+ error "(3) Fail to hard link to $DIR/$tdir/d0/foo"
+ do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+
+ cancel_lru_locks mdc
+ local count=$(stat --format=%h $DIR/$tdir/d0/foo)
+ [ $count -eq 1 ] || error "(4) Cannot inject error: $count"
+
+ echo "Trigger namespace LFSCK to repair the nlink count"
+ $START_NAMESPACE -r -A ||
+ error "(5) Fail to start LFSCK for namespace"
+
+ wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+ mdd.${MDT_DEV}.lfsck_namespace |
+ awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+ $SHOW_NAMESPACE
+ error "(6) unexpected status"
+ }
+
+ local repaired=$($SHOW_NAMESPACE |
+ awk '/^nlinks_repaired/ { print $2 }')
+ [ $repaired -eq 1 ] ||
+ error "(7) Fail to repair nlink count: $repaired"
+
+ cancel_lru_locks mdc
+ count=$(stat --format=%h $DIR/$tdir/d0/foo)
+ [ $count -eq 2 ] || error "(8) Fail to repair nlink count: $count"
+}
+run_test 29b "LFSCK can repair bad nlink count (2)"
+
+test_29c() {
+ echo "#####"
+ echo "There are too much hard links to the object, and exceeds the
+ echo object's linkEA limitation, as to NOT all the known name entries"
+ echo "will be recorded in the linkEA. Under such case, LFSCK should"
+ echo "skip the nlink verification for this object."
+ echo "#####"
+
+ check_mount_and_prep
+
+ $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0"
+ touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo"
+ ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 ||
+ error "(3) Fail to hard link to $DIR/$tdir/d0/foo"
+
+ echo "Inject failure stub on MDT0 to simulate the case that"
+ echo "foo's hard links exceed the object's linkEA limitation."
+
+ #define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627
+ do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1627
+ ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h2 ||
+ error "(4) Fail to hard link to $DIR/$tdir/d0/foo"
+
+ cancel_lru_locks mdc
+
+ local count1=$(stat --format=%h $DIR/$tdir/d0/foo)
+ [ $count1 -eq 3 ] || error "(5) Stat failure: $count1"
+
+ local foofid=$($LFS path2fid $DIR/$tdir/d0/foo)
+ $LFS fid2path $DIR $foofid
+ local count2=$($LFS fid2path $DIR $foofid | wc -l)
+ [ $count2 -eq 2 ] || "(6) Fail to inject error: $count2"
+
+ echo "Trigger namespace LFSCK to repair the nlink count"
+ $START_NAMESPACE -r -A ||
+ error "(7) Fail to start LFSCK for namespace"
+
+ wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+ mdd.${MDT_DEV}.lfsck_namespace |
+ awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+ $SHOW_NAMESPACE
+ error "(8) unexpected status"
+ }
+
+ do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+ local repaired=$($SHOW_NAMESPACE |
+ awk '/^nlinks_repaired/ { print $2 }')
+ [ $repaired -eq 0 ] ||
+ error "(9) Repair nlink count unexpcetedly: $repaired"
+
+ cancel_lru_locks mdc
+
+ count1=$(stat --format=%h $DIR/$tdir/d0/foo)
+ [ $count1 -eq 3 ] || error "(10) Stat failure: $count1"
+
+ count2=$($LFS fid2path $DIR $foofid | wc -l)
+ [ $count2 -eq 2 ] ||
+ error "(11) Repaired something unexpectedly: $count2"
+}
+run_test 29c "Not verify nlink attr if hark links exceed linkEA limitation"
+
$LCTL set_param debug=-lfsck > /dev/null || true
# restore MDS/OST size
CHECK_VALUE(LE_CONDITIONAL_DESTROY);
CHECK_VALUE(LE_PAIRS_VERIFY);
CHECK_VALUE(LE_CREATE_ORPHAN);
+ CHECK_VALUE(LE_SKIP_NLINK_DECLARE);
+ CHECK_VALUE(LE_SKIP_NLINK);
CHECK_VALUE_X(LEF_TO_OST);
CHECK_VALUE_X(LEF_FROM_OST);
(long long)LE_PAIRS_VERIFY);
LASSERTF(LE_CREATE_ORPHAN == 12, "found %lld\n",
(long long)LE_CREATE_ORPHAN);
+ LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n",
+ (long long)LE_SKIP_NLINK_DECLARE);
+ LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
+ (long long)LE_SKIP_NLINK);
LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n",
(unsigned)LEF_TO_OST);
LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",