* \param[in] env pointer to the thread context
* \param[in] com pointer to the lfsck component
*
- * \retval positive number for data corruption
* \retval 0 for success
- * \retval negative error number on failure
+ * \retval negative error number on failure or data corruption
*/
static int lfsck_namespace_load_bitmap(const struct lu_env *env,
struct lfsck_component *com)
rc = dt_xattr_get(env, obj,
lfsck_buf_get(env, bitmap->data, size),
XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA);
- if (rc == -ERANGE || rc == -ENODATA || rc == 0)
- RETURN(1);
-
- if (rc < 0)
- RETURN(rc);
-
if (rc != size)
- RETURN(rc);
+ RETURN(rc >= 0 ? -EINVAL : rc);
if (cfs_bitmap_check_empty(bitmap))
lad->lad_incomplete = 0;
* \param[in] type the orphan's type to be created
*
* type "P": The orphan object to be created was a parent directory
- * of some DMT-object which linkEA shows that the @orphan
+ * of some MDT-object which linkEA shows that the @orphan
* object is missing.
*
* \see lfsck_layout_recreate_parent() for more types.
* \param[in] type the orphan's type to be created
*
* type "P": The orphan object to be created was a parent directory
- * of some DMT-object which linkEA shows that the @orphan
+ * of some MDT-object which linkEA shows that the @orphan
* object is missing.
*
* \see lfsck_layout_recreate_parent() for more types.
* \param[in] orphan pointer to the orphan MDT-object
*
* type "P": The orphan object to be created was a parent directory
- * of some DMT-object which linkEA shows that the @orphan
+ * of some MDT-object which linkEA shows that the @orphan
* object is missing.
*
* \see lfsck_layout_recreate_parent() for more types.
}
/**
+ * Repair the object's nlink attribute.
+ *
+ * If all the known name entries have been verified, then the object's hard
+ * link attribute should match the object's linkEA entries count unless the
+ * object's has too much hard link to be recorded in the linkEA. Such cases
+ * should have been marked in the LFSCK tracing file. Otherwise, trust the
+ * linkEA to update the object's nlink attribute.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] obj pointer to the dt_object to be handled
+ * \param[in,out] nlink pointer to buffer to object's hard lock count before
+ * and after the repairing
+ *
+ * \retval positive number for repaired cases
+ * \retval 0 if nothing to be repaired
+ * \retval negative error number on failure
+ */
+static int lfsck_namespace_repair_nlink(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *obj, __u32 *nlink)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_attr *la = &info->lti_la3;
+ struct lu_fid *tfid = &info->lti_fid3;
+ struct lfsck_namespace *ns = com->lc_file_ram;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct dt_device *dev = lfsck->li_bottom;
+ const struct lu_fid *cfid = lfsck_dto2fid(obj);
+ struct dt_object *child = NULL;
+ struct thandle *th = NULL;
+ struct linkea_data ldata = { 0 };
+ struct lustre_handle lh = { 0 };
+ __u32 old = *nlink;
+ int rc = 0;
+ __u8 flags;
+ ENTRY;
+
+ LASSERT(!dt_object_remote(obj));
+ LASSERT(S_ISREG(lfsck_object_type(obj)));
+
+ child = lfsck_object_find_by_dev(env, dev, cfid);
+ if (IS_ERR(child))
+ GOTO(log, rc = PTR_ERR(child));
+
+ rc = lfsck_ibits_lock(env, lfsck, child, &lh,
+ MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_XATTR, LCK_EX);
+ if (rc != 0)
+ GOTO(log, rc);
+
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(log, rc = PTR_ERR(th));
+
+ la->la_valid = LA_NLINK;
+ rc = dt_declare_attr_set(env, child, la, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, child, 0);
+ /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
+ * ever tried to verify some remote MDT-object that resides on this
+ * MDT, but this MDT failed to respond such request. So means there
+ * may be some remote name entry on other MDT that references this
+ * object with another name, so we cannot know whether this linkEA
+ * is valid or not. So keep it there and maybe resolved when next
+ * LFSCK run. */
+ if (ns->ln_flags & LF_INCOMPLETE)
+ GOTO(unlock, rc = 0);
+
+ fid_cpu_to_be(tfid, cfid);
+ rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags,
+ (const struct dt_key *)tfid, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ if (flags & LNTF_SKIP_NLINK)
+ GOTO(unlock, rc = 0);
+
+ rc = lfsck_links_read2(env, child, &ldata);
+ if (rc == -ENODATA)
+ GOTO(unlock, rc = 0);
+
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ if (*nlink == ldata.ld_leh->leh_reccount)
+ GOTO(unlock, rc = 0);
+
+ la->la_nlink = *nlink = ldata.ld_leh->leh_reccount;
+ if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+ GOTO(unlock, rc = 1);
+
+ rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
+
+ GOTO(unlock, rc = (rc == 0 ? 1 : rc));
+
+unlock:
+ dt_write_unlock(env, child);
+
+stop:
+ dt_trans_stop(env, dev, th);
+
+log:
+ lfsck_ibits_unlock(&lh, LCK_EX);
+ if (child != NULL && !IS_ERR(child))
+ lfsck_object_put(env, child);
+
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
+ "nlink count from %u to %u: rc = %d\n",
+ lfsck_lfsck2name(lfsck), PFID(cfid), old, *nlink, rc);
+
+ if (rc != 0)
+ ns->ln_flags |= LF_INCONSISTENT;
+
+ return rc;
+}
+
+/**
* Double scan the directory object for namespace LFSCK.
*
* This function will verify the <parent, child> pairs in the namespace tree:
return rc;
if (la->la_nlink != count) {
- /* XXX: there will be other patch(es) for MDT-object
- * hard links verification. */
+ rc = lfsck_namespace_repair_nlink(env, com, child,
+ &la->la_nlink);
+ if (rc > 0) {
+ ns->ln_objs_nlink_repaired++;
+ rc = 0;
+ }
}
if (repaired) {
int rc;
rc = lfsck_namespace_load_bitmap(env, com);
- if (rc > 0 || (rc == 0 && ns->ln_status == LS_COMPLETED)) {
+ if (rc != 0 || ns->ln_status == LS_COMPLETED) {
rc = lfsck_namespace_reset(env, com, false);
if (rc == 0)
rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
- }
- if (rc != 0) {
- CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: rc = %d\n",
- lfsck_lfsck2name(lfsck), rc);
+ if (rc != 0) {
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
+ "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
- return rc;
+ return rc;
+ }
}
down_write(&com->lc_sem);
list_del_init(&com->lc_link_dir);
list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
} else if (result == 0) {
- ns->ln_status = lfsck->li_status;
- if (ns->ln_status == 0)
+ if (lfsck->li_status != 0)
+ ns->ln_status = lfsck->li_status;
+ else
ns->ln_status = LS_STOPPED;
if (ns->ln_status != LS_PAUSED) {
list_del_init(&com->lc_link_dir);
static int lfsck_namespace_double_scan(const struct lu_env *env,
struct lfsck_component *com)
{
- struct lfsck_namespace *ns = com->lc_file_ram;
+ struct lfsck_namespace *ns = com->lc_file_ram;
+ struct lfsck_assistant_data *lad = com->lc_data;
+ struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
+ struct lfsck_tgt_desc *ltd;
+ struct lfsck_tgt_desc *next;
+ int rc;
+
+ rc = lfsck_double_scan_generic(env, com, ns->ln_status);
+ if (thread_is_stopped(&lad->lad_thread)) {
+ LASSERT(list_empty(&lad->lad_req_list));
+ LASSERT(list_empty(&lad->lad_mdt_phase1_list));
- return lfsck_double_scan_generic(env, com, ns->ln_status);
+ spin_lock(<ds->ltd_lock);
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+ ltd_namespace_phase_list) {
+ list_del_init(<d->ltd_namespace_phase_list);
+ }
+ spin_unlock(<ds->ltd_lock);
+ }
+
+ return rc;
}
static void lfsck_namespace_data_release(const struct lu_env *env,
}
spin_unlock(<ds->ltd_lock);
- CFS_FREE_BITMAP(lad->lad_bitmap);
+ if (likely(lad->lad_bitmap != NULL))
+ CFS_FREE_BITMAP(lad->lad_bitmap);
OBD_FREE_PTR(lad);
}
+static void lfsck_namespace_quit(const struct lu_env *env,
+ struct lfsck_component *com)
+{
+ struct lfsck_assistant_data *lad = com->lc_data;
+ struct lfsck_tgt_descs *ltds = &com->lc_lfsck->li_mdt_descs;
+ struct lfsck_tgt_desc *ltd;
+ struct lfsck_tgt_desc *next;
+
+ LASSERT(lad != NULL);
+
+ lfsck_quit_generic(env, com);
+
+ LASSERT(thread_is_init(&lad->lad_thread) ||
+ thread_is_stopped(&lad->lad_thread));
+ LASSERT(list_empty(&lad->lad_req_list));
+
+ spin_lock(<ds->ltd_lock);
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
+ ltd_namespace_phase_list) {
+ list_del_init(<d->ltd_namespace_phase_list);
+ }
+ list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+ ltd_namespace_phase_list) {
+ list_del_init(<d->ltd_namespace_phase_list);
+ }
+ spin_unlock(<ds->ltd_lock);
+}
+
static int lfsck_namespace_in_notify(const struct lu_env *env,
struct lfsck_component *com,
- struct lfsck_request *lr)
+ struct lfsck_request *lr,
+ struct thandle *th)
{
struct lfsck_instance *lfsck = com->lc_lfsck;
struct lfsck_namespace *ns = com->lc_file_ram;
return rc;
}
+ case LE_SKIP_NLINK_DECLARE: {
+ struct dt_object *obj = com->lc_obj;
+ struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
+ __u8 flags = 0;
+
+ LASSERT(th != NULL);
+
+ rc = dt_declare_delete(env, obj,
+ (const struct dt_key *)key, th);
+ if (rc == 0)
+ rc = dt_declare_insert(env, obj,
+ (const struct dt_rec *)&flags,
+ (const struct dt_key *)key, th);
+
+ RETURN(rc);
+ }
+ case LE_SKIP_NLINK: {
+ struct dt_object *obj = com->lc_obj;
+ struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
+ __u8 flags = 0;
+ bool exist = false;
+ ENTRY;
+
+ LASSERT(th != NULL);
+
+ fid_cpu_to_be(key, &lr->lr_fid);
+ rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
+ (const struct dt_key *)key, BYPASS_CAPA);
+ if (rc == 0) {
+ if (flags & LNTF_SKIP_NLINK)
+ RETURN(0);
+
+ exist = true;
+ } else if (rc != -ENOENT) {
+ GOTO(log, rc);
+ }
+
+ flags |= LNTF_SKIP_NLINK;
+ if (exist) {
+ rc = dt_delete(env, obj, (const struct dt_key *)key,
+ th, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(log, rc);
+ }
+
+ rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
+ (const struct dt_key *)key, th, BYPASS_CAPA, 1);
+
+ GOTO(log, rc);
+
+log:
+ CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
+ " to be skipped for namespace double scan: rc = %d\n",
+ lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
+
+ if (rc != 0)
+ /* If we cannot record this object in the LFSCK tracing,
+ * we have to mark the LFSC as LF_INCOMPLETE, then the
+ * LFSCK will skip nlink attribute verification for
+ * all objects. */
+ ns->ln_flags |= LF_INCOMPLETE;
+
+ return 0;
+ }
case LE_PHASE1_DONE:
case LE_PHASE2_DONE:
case LE_PEER_EXIT:
.lfsck_dump = lfsck_namespace_dump,
.lfsck_double_scan = lfsck_namespace_double_scan,
.lfsck_data_release = lfsck_namespace_data_release,
- .lfsck_quit = lfsck_quit_generic,
+ .lfsck_quit = lfsck_namespace_quit,
.lfsck_in_notify = lfsck_namespace_in_notify,
.lfsck_query = lfsck_namespace_query,
};
GOTO(stop, rc);
rc = lfsck_links_write(env, obj, &ldata, handle);
+ if (unlikely(rc == -ENOSPC) &&
+ S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) {
+ if (handle != NULL) {
+ LASSERT(dt_write_locked(env, obj));
+
+ dt_write_unlock(env, obj);
+ dtlocked = false;
+
+ dt_trans_stop(env, dev, handle);
+ handle = NULL;
+
+ lfsck_ibits_unlock(&lh, LCK_EX);
+ }
+
+ rc = lfsck_namespace_trace_update(env, com,
+ &lnr->lnr_fid, LNTF_SKIP_NLINK, true);
+ if (rc != 0)
+ /* If we cannot record this object in the
+ * LFSCK tracing, we have to mark the LFSCK
+ * as LF_INCOMPLETE, then the LFSCK will
+ * skip nlink attribute verification for
+ * all objects. */
+ ns->ln_flags |= LF_INCOMPLETE;
+
+ GOTO(out, rc = 0);
+ }
+
if (rc != 0)
GOTO(stop, rc);
ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
ns->ln_success_count++;
} else if (rc == 0) {
- ns->ln_status = lfsck->li_status;
- if (ns->ln_status == 0)
+ if (lfsck->li_status != 0)
+ ns->ln_status = lfsck->li_status;
+ else
ns->ln_status = LS_STOPPED;
} else {
ns->ln_status = LS_FAILED;
if (rc != 0)
CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
"to sync failure with MDTs, and related MDTs "
- "may handle orphan un-properly: rc = %d\n",
+ "may handle orphan improperly: rc = %d\n",
lfsck_lfsck2name(lfsck), rc);
EXIT;