(const struct dt_key *)"..", BYPASS_CAPA);
}
+/**
+ * Check whether needs to scan the directory or not.
+ *
+ * 1) If we are not doing namespace LFSCK, or the given @obj is not directory,
+ * then needs not to scan the @obj. Otherwise,
+ * 2) Global /ROOT needs to be scanned, backend root needs not to be scanned.
+ * 3) If the @obj is neither IGIF nor normal FID (including .lustre and its
+ * sub-directories that have been scanned when the LFSCK engine start),
+ * then needs not to be scanned.
+ * 4) If it is a remote object, then scanning the object will be done on the
+ * MDT on which the object really resides.
+ * 5) If the local object has normal FID, then needs to be scanned. Otherwise,
+ * 6) If the object has linkEA, then needs to be scanned. Otherwise,
+ * 7) If none of the previous conditions are true, we need to check the parent
+ * directories whether this subdirectory is in a tree that should be scanned.
+ * Set the parent as current @obj, repeat 2)-7).
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] lfsck pointer to the lfsck instance
+ * \param[in] obj pointer to the object to be checked
+ *
+ * \retval positive number if the directory needs to be scanned
+ * \retval 0 if the directory needs NOT to be scanned
+ * \retval negative error number on failure
+ */
static int lfsck_needs_scan_dir(const struct lu_env *env,
struct lfsck_instance *lfsck,
struct dt_object *obj)
{
- struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
- int depth = 0;
- int rc;
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_fid *fid = &info->lti_fid;
+ struct lu_seq_range *range = &info->lti_range;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct seq_server_site *ss = lu_site2seq(dev->dd_lu_dev.ld_site);
+ __u32 idx = lfsck_dev_idx(dev);
+ int depth = 0;
+ int rc = 0;
if (list_empty(&lfsck->li_list_dir) || !S_ISDIR(lfsck_object_type(obj)))
- RETURN(0);
+ return 0;
+
+ LASSERT(ss != NULL);
+ *fid = *lfsck_dto2fid(obj);
while (1) {
- /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
- * which is the agent directory to manage the objects
- * which name entries reside on remote MDTs. Related
- * consistency verification will be processed in LFSCK
- * phase III. */
- if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
- if (depth > 0)
- lfsck_object_put(env, obj);
+ /* Global /ROOT is visible. */
+ if (unlikely(lu_fid_eq(fid, &lfsck->li_global_root_fid)))
return 1;
- }
- /* No need to check .lustre and its children. */
- if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj)))) {
- if (depth > 0)
- lfsck_object_put(env, obj);
+ /* Backend root is invisible. */
+ if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
return 0;
+
+ if (!fid_is_norm(fid) && !fid_is_igif(fid))
+ return 0;
+
+ fld_range_set_mdt(range);
+ rc = fld_local_lookup(env, ss->ss_server_fld,
+ fid_seq(fid), range);
+ if (rc != 0 || range->lsr_index != idx) {
+ /* Current FID should NOT be for the input parameter
+ * @obj, because the lfsck_master_oit_engine() has
+ * filtered out agent object. So current FID is for
+ * the ancestor of the original input parameter @obj.
+ * So he ancestor is a remote directory. The input
+ * parameter @obj is local directory, and should be
+ * scanned under such case. */
+ LASSERT(depth > 0);
+
+ return 1;
+ }
+
+ /* normal FID on this target (locally) must be for the
+ * client-side visiable object. */
+ if (fid_is_norm(fid))
+ return 1;
+
+ if (obj == NULL) {
+ obj = lfsck_object_find(env, lfsck, fid);
+ if (IS_ERR(obj))
+ return PTR_ERR(obj);
+
+ depth++;
+ if (!dt_object_exists(obj))
+ GOTO(out, rc = 0);
}
dt_read_lock(env, obj, MOR_TGT_CHILD);
if (unlikely(lfsck_is_dead_obj(obj))) {
dt_read_unlock(env, obj);
- if (depth > 0)
- lfsck_object_put(env, obj);
- return 0;
+
+ GOTO(out, rc = 0);
}
rc = dt_xattr_get(env, obj,
lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
BYPASS_CAPA);
dt_read_unlock(env, obj);
- if (rc >= 0) {
- if (depth > 0)
- lfsck_object_put(env, obj);
- return 1;
- }
+ if (rc >= 0)
+ GOTO(out, rc = 1);
- if (rc < 0 && rc != -ENODATA) {
- if (depth > 0)
- lfsck_object_put(env, obj);
- return rc;
- }
+ if (rc < 0 && rc != -ENODATA)
+ GOTO(out, rc);
rc = lfsck_parent_fid(env, obj, fid);
if (depth > 0)
lfsck_object_put(env, obj);
+
+ obj = NULL;
if (rc != 0)
return rc;
- if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
+ if (!fid_is_sane(fid))
return 0;
+ }
- obj = lfsck_object_find(env, lfsck, fid);
- if (IS_ERR(obj))
- return PTR_ERR(obj);
-
- if (!dt_object_exists(obj)) {
- lfsck_object_put(env, obj);
- return 0;
- }
-
- if (dt_object_remote(obj)) {
- /* .lustre/lost+found/MDTxxx can be remote directory. */
- if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj))))
- rc = 0;
- else
- /* Other remote directory should be client
- * visible and need to be checked. */
- rc = 1;
- lfsck_object_put(env, obj);
- return rc;
- }
+out:
+ if (depth > 0 && obj != NULL)
+ lfsck_object_put(env, obj);
- depth++;
- }
- return 0;
+ return rc;
}
/* LFSCK wrap functions */
lfsck->li_current_oit_processed = 0;
list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
com->lc_new_checked = 0;
- if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
- com->lc_journal = 0;
-
rc = com->lc_ops->lfsck_prep(env, com, lsp);
if (rc != 0)
GOTO(out, rc);
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
- /* XXX: Currently, skip remote object, the consistency for
- * remote object will be processed in LFSCK phase III. */
+ /* Remote directory will be scanned by the LFSCK instance
+ * on the MDT where the remote object really resides on. */
if (!dt_object_exists(obj) || dt_object_remote(obj) ||
unlikely(!S_ISDIR(lfsck_object_type(obj))))
GOTO(out, rc = 0);
int rc1 = 0;
list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
- if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
- com->lc_journal = 0;
-
rc = com->lc_ops->lfsck_double_scan(env, com);
if (rc != 0)
rc1 = rc;
l_wait_event(thread->t_ctl_waitq,
!thread_is_running(thread),
&lwi);
+
+ if (unlikely(!thread_is_running(thread))) {
+ CDEBUG(D_LFSCK, "%s: scan dir exit for engine "
+ "stop, parent "DFID", cookie "LPX64"\n",
+ lfsck_lfsck2name(lfsck),
+ PFID(lfsck_dto2fid(dir)),
+ lfsck->li_cookie_dir);
+ RETURN(0);
+ }
}
lfsck->li_new_scanned++;
RETURN(rc);
}
+/**
+ * Object-table based iteration engine.
+ *
+ * Object-table based iteration is the basic linear engine to scan all the
+ * objects on current device in turn. For each object, it calls all the
+ * registered LFSCK component(s)' API to perform related consistency
+ * verification.
+ *
+ * It flushes related LFSCK tracing files to disk via making checkpoint
+ * periodically. Then if the server crashed or the LFSCK is paused, the
+ * LFSCK can resume from the latest checkpoint.
+ *
+ * It also controls the whole LFSCK speed via lfsck_control_speed() to
+ * avoid the server to become overload.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] lfsck pointer to the lfsck instance
+ *
+ * \retval positive number if all objects have been scanned
+ * \retval 0 if the iteration is stopped or paused
+ * \retval negative error number on failure
+ */
static int lfsck_master_oit_engine(const struct lu_env *env,
struct lfsck_instance *lfsck)
{
- struct lfsck_thread_info *info = lfsck_env_info(env);
- const struct dt_it_ops *iops =
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ const struct dt_it_ops *iops =
&lfsck->li_obj_oit->do_index_ops->dio_it;
- struct dt_it *di = lfsck->li_di_oit;
- struct lu_fid *fid = &info->lti_fid;
- struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
- struct ptlrpc_thread *thread = &lfsck->li_thread;
- __u32 idx =
- lfsck_dev_idx(lfsck->li_bottom);
- int rc;
+ struct dt_it *di = lfsck->li_di_oit;
+ struct lu_fid *fid = &info->lti_fid;
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct ptlrpc_thread *thread = &lfsck->li_thread;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct seq_server_site *ss = lu_site2seq(dev->dd_lu_dev.ld_site);
+ __u32 idx = lfsck_dev_idx(dev);
+ int rc;
ENTRY;
+ if (unlikely(ss == NULL))
+ RETURN(-EIO);
+
do {
struct dt_object *target;
bool update_lma = false;
l_wait_event(thread->t_ctl_waitq,
!thread_is_running(thread),
&lwi);
+
+ if (unlikely(!thread_is_running(thread))) {
+ CDEBUG(D_LFSCK, "%s: OIT scan exit for engine "
+ "stop, cookie "LPU64"\n",
+ lfsck_lfsck2name(lfsck),
+ iops->store(env, di));
+ RETURN(0);
+ }
}
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
update_lma = true;
}
} else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
- !fid_is_last_id(fid) && !fid_is_root(fid) &&
- !fid_seq_is_dot(fid_seq(fid))) {
+ !fid_is_last_id(fid) &&
+ !lu_fid_eq(fid, &lfsck->li_global_root_fid)) {
+
/* If the FID/object is only used locally and invisible
- * to external nodes, then LFSCK will not handle it. */
+ * to external nodes, then LFSCK will not handle it.
+ *
+ * dot_lustre sequence has been handled specially. */
goto checkpoint;
+ } else {
+ struct lu_seq_range *range = &info->lti_range;
+
+ if (lfsck->li_master)
+ fld_range_set_mdt(range);
+ else
+ fld_range_set_ost(range);
+ rc = fld_local_lookup(env, ss->ss_server_fld,
+ fid_seq(fid), range);
+ if (rc != 0 || range->lsr_index != idx) {
+ /* Remote object will be handled by the LFSCK
+ * instance on the MDT where the remote object
+ * really resides on. */
+ rc = 0;
+ goto checkpoint;
+ }
}
target = lfsck_object_find(env, lfsck, fid);
goto checkpoint;
}
- /* XXX: Currently, skip remote object, the consistency for
- * remote object will be processed in LFSCK phase III. */
- if (dt_object_exists(target) && !dt_object_remote(target)) {
+ if (dt_object_exists(target)) {
if (update_lma) {
rc = lfsck_update_lma(env, lfsck, target);
if (rc != 0)
dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
- dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
dst->ln_objs_repaired_phase2 =
le64_to_cpu(src->ln_objs_repaired_phase2);
&src->ln_fid_latest_scanned_phase2);
dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
+ dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
+ dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
}
static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
- dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
dst->ln_objs_repaired_phase2 =
cpu_to_le64(src->ln_objs_repaired_phase2);
&src->ln_fid_latest_scanned_phase2);
dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
+ dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
+ dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
}
static void lfsck_namespace_record_failure(const struct lu_env *env,
return rc;
}
-static int lfsck_namespace_lookup(const struct lu_env *env,
- struct lfsck_component *com,
- const struct lu_fid *fid, __u8 *flags)
-{
- struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
- int rc;
-
- fid_cpu_to_be(key, fid);
- rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
- (const struct dt_key *)key, BYPASS_CAPA);
- return rc;
-}
-
-static int lfsck_namespace_delete(const struct lu_env *env,
- struct lfsck_component *com,
- const struct lu_fid *fid)
+/**
+ * Update the namespace LFSCK tracing file for the given @fid
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] fid the fid which flags to be updated in the lfsck
+ * tracing file
+ * \param[in] add true if add new flags, otherwise remove flags
+ *
+ * \retval 0 for succeed or nothing to be done
+ * \retval negative error number on failure
+ */
+int lfsck_namespace_trace_update(const struct lu_env *env,
+ struct lfsck_component *com,
+ const struct lu_fid *fid,
+ const __u8 flags, bool add)
{
struct lfsck_instance *lfsck = com->lc_lfsck;
- struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
- struct thandle *handle;
struct dt_object *obj = com->lc_obj;
- int rc;
+ struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct thandle *th = NULL;
+ int rc = 0;
+ __u8 old = 0;
+ __u8 new = 0;
ENTRY;
- handle = dt_trans_create(env, lfsck->li_bottom);
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
-
- rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
- if (rc != 0)
- GOTO(out, rc);
-
- rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
- if (rc != 0)
- GOTO(out, rc);
+ LASSERT(flags != 0);
+ down_write(&com->lc_sem);
fid_cpu_to_be(key, fid);
- rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
- BYPASS_CAPA);
-
- GOTO(out, rc);
+ rc = dt_lookup(env, obj, (struct dt_rec *)&old,
+ (const struct dt_key *)key, BYPASS_CAPA);
+ if (rc == -ENOENT) {
+ if (!add)
+ GOTO(unlock, rc = 0);
-out:
- dt_trans_stop(env, lfsck->li_bottom, handle);
- return rc;
-}
+ old = 0;
+ new = flags;
+ } else if (rc == 0) {
+ if (add) {
+ if ((old & flags) == flags)
+ GOTO(unlock, rc = 0);
-static int lfsck_namespace_update(const struct lu_env *env,
- struct lfsck_component *com,
- const struct lu_fid *fid,
- __u8 flags, bool force)
-{
- struct lfsck_instance *lfsck = com->lc_lfsck;
- struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
- struct thandle *handle;
- struct dt_object *obj = com->lc_obj;
- int rc;
- bool exist = false;
- __u8 tf;
- ENTRY;
+ new = old | flags;
+ } else {
+ if ((old & flags) == 0)
+ GOTO(unlock, rc = 0);
- rc = lfsck_namespace_lookup(env, com, fid, &tf);
- if (rc != 0 && rc != -ENOENT)
- RETURN(rc);
+ new = old & ~flags;
+ }
+ } else {
+ GOTO(log, rc);
+ }
- if (rc == 0) {
- if (!force || flags == tf)
- RETURN(0);
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(log, rc = PTR_ERR(th));
- exist = true;
- handle = dt_trans_create(env, lfsck->li_bottom);
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ if (old != 0) {
+ rc = dt_declare_delete(env, obj,
+ (const struct dt_key *)key, th);
+ if (rc != 0)
+ GOTO(log, rc);
+ }
- rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
- handle);
+ if (new != 0) {
+ rc = dt_declare_insert(env, obj,
+ (const struct dt_rec *)&new,
+ (const struct dt_key *)key, th);
if (rc != 0)
- GOTO(out, rc);
- } else {
- handle = dt_trans_create(env, lfsck->li_bottom);
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ GOTO(log, rc);
}
- rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
- (const struct dt_key *)fid, handle);
+ rc = dt_trans_start_local(env, dev, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(log, rc);
- rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
- if (rc != 0)
- GOTO(out, rc);
+ if (old != 0) {
+ rc = dt_delete(env, obj, (const struct dt_key *)key,
+ th, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(log, rc);
+ }
- fid_cpu_to_be(key, fid);
- if (exist) {
- rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
- BYPASS_CAPA);
+ if (new != 0) {
+ rc = dt_insert(env, obj, (const struct dt_rec *)&new,
+ (const struct dt_key *)key, th, BYPASS_CAPA, 1);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(log, rc);
}
- rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
- (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
+ GOTO(log, rc);
- GOTO(out, rc);
+log:
+ if (th != NULL && !IS_ERR(th))
+ dt_trans_stop(env, dev, th);
+
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
+ "tracing file, flags %x, old %x, new %x: rc = %d\n",
+ lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
+ (__u32)flags, (__u32)old, (__u32)new, rc);
+
+unlock:
+ up_write(&com->lc_sem);
-out:
- dt_trans_stop(env, lfsck->li_bottom, handle);
return rc;
}
return rc;
}
-static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
- struct linkea_data *ldata)
+int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
+ struct linkea_data *ldata)
{
int rc;
- ldata->ld_buf =
- lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
- PAGE_CACHE_SIZE);
if (ldata->ld_buf->lb_buf == NULL)
return -ENOMEM;
if (!dt_object_exists(obj))
- return -ENODATA;
+ return -ENOENT;
rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
if (rc == -ERANGE) {
/* Buf was too small, figure out what we need. */
- lu_buf_free(ldata->ld_buf);
- rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
+ rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
BYPASS_CAPA);
- if (rc < 0)
+ if (rc <= 0)
return rc;
- ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+ lu_buf_realloc(ldata->ld_buf, rc);
if (ldata->ld_buf->lb_buf == NULL)
return -ENOMEM;
rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
BYPASS_CAPA);
}
- if (rc < 0)
- return rc;
- linkea_init(ldata);
+ if (rc > 0)
+ rc = linkea_init(ldata);
- return 0;
+ return rc;
+}
+
+/**
+ * Remove linkEA for the given object.
+ *
+ * The caller should take the ldlm lock before the calling.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] obj pointer to the dt_object to be handled
+ *
+ * \retval 0 for repaired cases
+ * \retval negative error number on failure
+ */
+static int lfsck_namespace_links_remove(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *obj)
+{
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct thandle *th = NULL;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(dt_object_remote(obj) == 0);
+
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(log, rc = PTR_ERR(th));
+
+ rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, obj, 0);
+ if (unlikely(lfsck_is_dead_obj(obj)))
+ GOTO(unlock, rc = -ENOENT);
+
+ if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+ GOTO(unlock, rc = 0);
+
+ rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
+
+ GOTO(unlock, rc);
+
+unlock:
+ dt_write_unlock(env, obj);
+
+stop:
+ dt_trans_stop(env, dev, th);
+
+log:
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
+ "for the object "DFID": rc = %d\n",
+ lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
+
+ if (rc == 0) {
+ struct lfsck_namespace *ns = com->lc_file_ram;
+
+ ns->ln_flags |= LF_INCONSISTENT;
+ }
+
+ return rc;
}
static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
BYPASS_CAPA);
}
-/**
- * \retval ve: removed entries
- */
-static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
- struct linkea_data *ldata,
- struct lu_name *cname,
- struct lu_fid *pfid)
+static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
+ struct lu_name *cname,
+ struct lu_fid *pfid,
+ char *buf)
+{
+ linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
+ /* To guarantee the 'name' is terminated with '0'. */
+ memcpy(buf, cname->ln_name, cname->ln_namelen);
+ buf[cname->ln_namelen] = 0;
+ cname->ln_name = buf;
+}
+
+static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
+ struct lu_name *cname,
+ struct lu_fid *pfid,
+ bool remove)
{
struct link_ea_entry *oldlee;
int oldlen;
- int removed = 0;
+ int repeated = 0;
- linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
oldlee = ldata->ld_lee;
oldlen = ldata->ld_reclen;
linkea_next_entry(ldata);
ldata->ld_lee->lee_reclen[1];
if (unlikely(ldata->ld_reclen == oldlen &&
memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
+ repeated++;
+ if (!remove)
+ break;
+
linkea_del_buf(ldata, cname);
- removed++;
} else {
linkea_next_entry(ldata);
}
}
ldata->ld_lee = oldlee;
ldata->ld_reclen = oldlen;
- return removed;
+
+ return repeated;
+}
+
+static int lfsck_namespace_insert_orphan(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *orphan,
+ const char *infix, const char *type,
+ int *count)
+{
+ /* XXX: TBD */
+ return 0;
+}
+
+static int lfsck_namespace_insert_normal(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *parent,
+ struct dt_object *child,
+ const char *name)
+{
+ /* XXX: TBD */
+ return 0;
+}
+
+static int lfsck_namespace_create_orphan(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *orphan)
+{
+ /* XXX: TBD */
+ return 0;
}
/**
- * \retval +ve repaired
- * \retval 0 no need to repair
- * \retval -ve error cases
+ * Remove the specified entry from the linkEA.
+ *
+ * Locate the linkEA entry with the given @cname and @pfid, then
+ * remove this entry or the other entries those are repeated with
+ * this entry.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] obj pointer to the dt_object to be handled
+ * \param[in,out]ldata pointer to the buffer that holds the linkEA
+ * \param[in] cname the name for the child in the parent directory
+ * \param[in] pfid the parent directory's FID for the linkEA
+ * \param[in] next if true, then remove the first found linkEA
+ * entry, and move the ldata->ld_lee to next entry
+ *
+ * \retval positive number for repaired cases
+ * \retval 0 if nothing to be repaired
+ * \retval negative error number on failure
*/
-static int lfsck_namespace_double_scan_one(const struct lu_env *env,
- struct lfsck_component *com,
- struct dt_object *child, __u8 flags)
+static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *obj,
+ struct linkea_data *ldata,
+ struct lu_name *cname,
+ struct lu_fid *pfid,
+ bool next)
{
- struct lfsck_thread_info *info = lfsck_env_info(env);
- struct lu_attr *la = &info->lti_la;
- struct lu_name *cname = &info->lti_name;
- struct lu_fid *pfid = &info->lti_fid;
- struct lu_fid *cfid = &info->lti_fid2;
- struct lfsck_instance *lfsck = com->lc_lfsck;
- struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
- struct lfsck_namespace *ns = com->lc_file_ram;
- struct linkea_data ldata = { 0 };
- struct thandle *handle = NULL;
- bool locked = false;
- bool update = false;
- int rc;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct thandle *th = NULL;
+ struct lustre_handle lh = { 0 };
+ struct linkea_data ldata_new = { 0 };
+ struct lu_buf linkea_buf;
+ int rc = 0;
ENTRY;
- if (com->lc_journal) {
+ rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+ MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_XATTR, LCK_EX);
+ if (rc != 0)
+ GOTO(log, rc);
+
+ if (next)
+ linkea_del_buf(ldata, cname);
+ else
+ lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
+ true);
+ lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
+ ldata->ld_leh->leh_len);
again:
- LASSERT(!locked);
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(unlock1, rc = PTR_ERR(th));
- update = false;
- com->lc_journal = 1;
- handle = dt_trans_create(env, lfsck->li_next);
- if (IS_ERR(handle))
- RETURN(rc = PTR_ERR(handle));
+ rc = dt_declare_xattr_set(env, obj, &linkea_buf,
+ XATTR_NAME_LINK, 0, th);
+ if (rc != 0)
+ GOTO(stop, rc);
- rc = dt_declare_xattr_set(env, child,
- lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
- XATTR_NAME_LINK, 0, handle);
- if (rc != 0)
- GOTO(stop, rc);
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc != 0)
+ GOTO(stop, rc);
- rc = dt_trans_start(env, lfsck->li_next, handle);
- if (rc != 0)
- GOTO(stop, rc);
+ dt_write_lock(env, obj, 0);
+ if (unlikely(lfsck_is_dead_obj(obj)))
+ GOTO(unlock2, rc = -ENOENT);
+
+ rc = lfsck_links_read2(env, obj, &ldata_new);
+ if (rc != 0)
+ GOTO(unlock2, rc);
+
+ /* The specified linkEA entry has been removed by race. */
+ rc = linkea_links_find(&ldata_new, cname, pfid);
+ if (rc != 0)
+ GOTO(unlock2, rc = 0);
+
+ if (bk->lb_param & LPF_DRYRUN)
+ GOTO(unlock2, rc = 1);
+
+ if (next)
+ linkea_del_buf(&ldata_new, cname);
+ else
+ lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
+ true);
- dt_write_lock(env, child, MOR_TGT_CHILD);
- locked = true;
+ if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
+ dt_write_unlock(env, obj);
+ dt_trans_stop(env, dev, th);
+ lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
+ ldata_new.ld_leh->leh_len);
+ goto again;
}
- if (unlikely(lfsck_is_dead_obj(child)))
- GOTO(stop, rc = 0);
+ lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
+ ldata_new.ld_leh->leh_len);
+ rc = dt_xattr_set(env, obj, &linkea_buf,
+ XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
+
+ GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
+
+unlock2:
+ dt_write_unlock(env, obj);
+
+stop:
+ dt_trans_stop(env, dev, th);
+
+unlock1:
+ lfsck_ibits_unlock(&lh, LCK_EX);
+
+log:
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
+ "for the object: "DFID", parent "DFID", name %.*s\n",
+ lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
+ PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
+ cname->ln_name);
- rc = dt_attr_get(env, child, la, BYPASS_CAPA);
- if (rc == 0)
- rc = lfsck_links_read(env, child, &ldata);
if (rc != 0) {
- if ((bk->lb_param & LPF_DRYRUN) &&
- (rc == -EINVAL || rc == -ENODATA))
- rc = 1;
+ struct lfsck_namespace *ns = com->lc_file_ram;
- GOTO(stop, rc);
+ ns->ln_flags |= LF_INCONSISTENT;
+ }
+
+ return rc;
+}
+
+/**
+ * Conditionally remove the specified entry from the linkEA.
+ *
+ * Take the parent lock firstly, then check whether the specified
+ * name entry exists or not: if yes, do nothing; otherwise, call
+ * lfsck_namespace_shrink_linkea() to remove the linkea entry.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] parent pointer to the parent directory
+ * \param[in] child pointer to the child object that holds the linkEA
+ * \param[in,out]ldata pointer to the buffer that holds the linkEA
+ * \param[in] cname the name for the child in the parent directory
+ * \param[in] pfid the parent directory's FID for the linkEA
+ *
+ * \retval positive number for repaired cases
+ * \retval 0 if nothing to be repaired
+ * \retval negative error number on failure
+ */
+static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *parent,
+ struct dt_object *child,
+ struct linkea_data *ldata,
+ struct lu_name *cname,
+ struct lu_fid *pfid)
+{
+ struct lu_fid *cfid = &lfsck_env_info(env)->lti_fid3;
+ struct lustre_handle lh = { 0 };
+ int rc;
+ ENTRY;
+
+ rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
+ MDS_INODELOCK_UPDATE, LCK_EX);
+ if (rc != 0)
+ RETURN(rc);
+
+ dt_read_lock(env, parent, 0);
+ if (unlikely(lfsck_is_dead_obj(parent))) {
+ dt_read_unlock(env, parent);
+ lfsck_ibits_unlock(&lh, LCK_EX);
+ rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
+ cname, pfid, true);
+
+ RETURN(rc);
+ }
+
+ rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
+ (const struct dt_key *)cname->ln_name,
+ BYPASS_CAPA);
+ dt_read_unlock(env, parent);
+
+ /* It is safe to release the ldlm lock, because when the logic come
+ * here, we have got all the needed information above whether the
+ * linkEA entry is valid or not. It is not important that others
+ * may add new linkEA entry after the ldlm lock released. If other
+ * has removed the specified linkEA entry by race, then it is OK,
+ * because the subsequent lfsck_namespace_shrink_linkea() can handle
+ * such case. */
+ lfsck_ibits_unlock(&lh, LCK_EX);
+ if (rc == -ENOENT) {
+ rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
+ cname, pfid, true);
+
+ RETURN(rc);
+ }
+
+ if (rc != 0)
+ RETURN(rc);
+
+ /* The LFSCK just found some internal status of cross-MDTs
+ * create operation. That is normal. */
+ if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
+ linkea_next_entry(ldata);
+
+ RETURN(0);
}
+ rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
+ pfid, true);
+
+ RETURN(rc);
+}
+
+/**
+ * Double scan the MDT-object for namespace LFSCK.
+ *
+ * If the MDT-object contains invalid or repeated linkEA entries, then drop
+ * those entries from the linkEA; if the linkEA becomes empty or the object
+ * has no linkEA, then it is an orphan and will be added into the directory
+ * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
+ * the remote parent; if the name entry corresponding to some linkEA entry
+ * is lost, then add the name entry back to the namespace.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] child pointer to the dt_object to be handled
+ * \param[in] flags some hints to indicate how the @child should be handled
+ *
+ * \retval positive number for repaired cases
+ * \retval 0 if nothing to be repaired
+ * \retval negative error number on failure
+ */
+static int lfsck_namespace_double_scan_one(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *child, __u8 flags)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_attr *la = &info->lti_la;
+ struct lu_name *cname = &info->lti_name;
+ struct lu_fid *pfid = &info->lti_fid;
+ struct lu_fid *cfid = &info->lti_fid2;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct lfsck_namespace *ns = com->lc_file_ram;
+ struct dt_object *parent = NULL;
+ struct linkea_data ldata = { 0 };
+ bool repaired = false;
+ int count = 0;
+ int rc;
+ ENTRY;
+
+ dt_read_lock(env, child, 0);
+ if (unlikely(lfsck_is_dead_obj(child))) {
+ dt_read_unlock(env, child);
+
+ RETURN(0);
+ }
+
+ rc = lfsck_links_read(env, child, &ldata);
+ dt_read_unlock(env, child);
+ if (rc != 0)
+ GOTO(out, rc);
+
linkea_first_entry(&ldata);
while (ldata.ld_lee != NULL) {
- struct dt_object *parent = NULL;
+ lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
+ info->lti_key);
+ rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
+ false);
+ /* Found repeated linkEA entries */
+ if (rc > 0) {
+ rc = lfsck_namespace_shrink_linkea(env, com, child,
+ &ldata, cname, pfid, false);
+ if (rc < 0)
+ GOTO(out, rc);
- rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
- if (rc > 0)
- update = true;
+ if (rc == 0)
+ continue;
+
+ repaired = true;
+
+ /* fall through */
+ }
+
+ /* Invalid PFID in the linkEA entry. */
+ if (!fid_is_sane(pfid)) {
+ rc = lfsck_namespace_shrink_linkea(env, com, child,
+ &ldata, cname, pfid, true);
+ if (rc < 0)
+ GOTO(out, rc);
- if (!fid_is_sane(pfid))
- goto shrink;
+ if (rc > 0)
+ repaired = true;
+
+ continue;
+ }
parent = lfsck_object_find(env, lfsck, pfid);
if (IS_ERR(parent))
- GOTO(stop, rc = PTR_ERR(parent));
+ GOTO(out, rc = PTR_ERR(parent));
+
+ if (!dt_object_exists(parent)) {
+ if (ldata.ld_leh->leh_reccount > 1) {
+ /* If it is NOT the last linkEA entry, then
+ * there is still other chance to make the
+ * child to be visible via other parent, then
+ * remove this linkEA entry. */
+ rc = lfsck_namespace_shrink_linkea(env, com,
+ child, &ldata, cname, pfid, true);
+ } else {
+ /* Create the lost parent as an orphan. */
+ rc = lfsck_namespace_create_orphan(env, com,
+ parent);
+ if (rc < 0) {
+ lfsck_object_put(env, parent);
+
+ GOTO(out, rc);
+ }
- if (!dt_object_exists(parent))
- goto shrink;
+ if (rc > 0)
+ repaired = true;
+
+ /* Add the missed name entry to the parent. */
+ rc = lfsck_namespace_insert_normal(env, com,
+ parent, child, cname->ln_name);
+ linkea_next_entry(&ldata);
+ }
- /* XXX: Currently, skip remote object, the consistency for
- * remote object will be processed in LFSCK phase III. */
- if (dt_object_remote(parent)) {
lfsck_object_put(env, parent);
- linkea_next_entry(&ldata);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (rc > 0)
+ repaired = true;
+
continue;
}
- if (unlikely(!dt_try_as_dir(env, parent)))
- goto shrink;
+ /* The linkEA entry with bad parent will be removed. */
+ if (unlikely(!dt_try_as_dir(env, parent))) {
+ lfsck_object_put(env, parent);
+ rc = lfsck_namespace_shrink_linkea(env, com, child,
+ &ldata, cname, pfid, true);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (rc > 0)
+ repaired = true;
+
+ continue;
+ }
- /* To guarantee the 'name' is terminated with '0'. */
- memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
- info->lti_key[cname->ln_namelen] = 0;
- cname->ln_name = info->lti_key;
rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
(const struct dt_key *)cname->ln_name,
BYPASS_CAPA);
if (rc != 0 && rc != -ENOENT) {
lfsck_object_put(env, parent);
- GOTO(stop, rc);
+
+ GOTO(out, rc);
}
if (rc == 0) {
+ lfsck_object_put(env, parent);
if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
- lfsck_object_put(env, parent);
+ /* It is the most common case that we
+ * find the name entry corresponding
+ * to the linkEA entry. */
linkea_next_entry(&ldata);
- continue;
+ } else {
+ /* XXX: The name entry references another
+ * MDT-object that may be created by
+ * the LFSCK for repairing dangling
+ * name entry. There will be another
+ * patch for further processing. */
+ rc = lfsck_namespace_shrink_linkea(env, com,
+ child, &ldata, cname, pfid, true);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (rc > 0)
+ repaired = true;
}
- goto shrink;
+ continue;
}
+ rc = dt_attr_get(env, child, la, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(out, rc);
+
/* If there is no name entry in the parent dir and the object
* link count is less than the linkea entries count, then the
* linkea entry should be removed. */
- if (ldata.ld_leh->leh_reccount > la->la_nlink)
- goto shrink;
-
- /* XXX: For the case of there is a linkea entry, but without
- * name entry pointing to the object and its hard links
- * count is not less than the object name entries count,
- * then seems we should add the 'missed' name entry back
- * to namespace, but before LFSCK phase III finished, we
- * do not know whether the object has some inconsistency
- * on other MDTs. So now, do NOT add the name entry back
- * to the namespace, but keep the linkEA entry. LU-2914 */
+ if (ldata.ld_leh->leh_reccount > la->la_nlink) {
+ rc = lfsck_namespace_shrink_linkea_cond(env, com,
+ parent, child, &ldata, cname, pfid);
+ lfsck_object_put(env, parent);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (rc > 0)
+ repaired = true;
+
+ continue;
+ }
+
+ /* Add the missed name entry back to the namespace. */
+ rc = lfsck_namespace_insert_normal(env, com, parent, child,
+ cname->ln_name);
lfsck_object_put(env, parent);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (rc > 0)
+ repaired = true;
+
linkea_next_entry(&ldata);
- continue;
+ }
-shrink:
- if (parent != NULL)
- lfsck_object_put(env, parent);
- if (bk->lb_param & LPF_DRYRUN)
- RETURN(1);
+ GOTO(out, rc = 0);
+
+out:
+ if (rc < 0 && rc != -ENODATA)
+ return rc;
- CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
- "for the object: "DFID", parent "DFID", name %.*s\n",
- lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)),
- PFID(pfid), cname->ln_namelen, cname->ln_name);
+ if (rc == 0) {
+ LASSERT(ldata.ld_leh != NULL);
- linkea_del_buf(&ldata, cname);
- update = true;
+ count = ldata.ld_leh->leh_reccount;
}
- if (update) {
- if (!com->lc_journal) {
- com->lc_journal = 1;
- goto again;
- }
+ if (count == 0) {
+ /* If the child becomes orphan, then insert it into
+ * the global .lustre/lost+found/MDTxxxx directory. */
+ rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
+ &count);
+ if (rc < 0)
+ return rc;
- rc = lfsck_links_write(env, child, &ldata, handle);
+ if (rc > 0)
+ repaired = true;
}
- GOTO(stop, rc);
+ rc = dt_attr_get(env, child, la, BYPASS_CAPA);
+ if (rc != 0)
+ return rc;
-stop:
- if (locked) {
- /* XXX: For the case linkea entries count does not match the object hard
- * links count, we cannot update the later one simply. Before LFSCK
- * phase III finished, we cannot know whether there are some remote
- * name entries to be repaired or not. LU-2914 */
- if (rc == 0 && !lfsck_is_dead_obj(child) &&
- ldata.ld_leh != NULL &&
- ldata.ld_leh->leh_reccount != la->la_nlink)
- CDEBUG(D_LFSCK, "%s: the object "DFID" linkEA entry "
- "count %u may not match its hardlink count %u\n",
- lfsck_lfsck2name(lfsck), PFID(cfid),
- ldata.ld_leh->leh_reccount, la->la_nlink);
-
- dt_write_unlock(env, child);
+ if (la->la_nlink != count) {
+ /* XXX: there will be other patch(es) for MDT-object
+ * hard links verification. */
}
- if (handle != NULL)
- dt_trans_stop(env, lfsck->li_next, handle);
+ if (repaired) {
+ if (la->la_nlink > 1) {
+ down_write(&com->lc_sem);
+ ns->ln_mul_linked_repaired++;
+ up_write(&com->lc_sem);
+ }
- if (rc == 0 && update) {
- ns->ln_objs_nlink_repaired++;
- rc = 1;
+ if (rc == 0)
+ rc = 1;
}
return rc;
}
+static void lfsck_namespace_dump_statistics(struct seq_file *m,
+ struct lfsck_namespace *ns,
+ __u64 checked_phase1,
+ __u64 checked_phase2,
+ __u32 time_phase1,
+ __u32 time_phase2)
+{
+ seq_printf(m, "checked_phase1: "LPU64"\n"
+ "checked_phase2: "LPU64"\n"
+ "updated_phase1: "LPU64"\n"
+ "updated_phase2: "LPU64"\n"
+ "failed_phase1: "LPU64"\n"
+ "failed_phase2: "LPU64"\n"
+ "directories: "LPU64"\n"
+ "dirent_repaired: "LPU64"\n"
+ "linkea_repaired: "LPU64"\n"
+ "nlinks_repaired: "LPU64"\n"
+ "lost_found: "LPU64"\n"
+ "multiple_linked_checked: "LPU64"\n"
+ "multiple_linked_repaired: "LPU64"\n"
+ "success_count: %u\n"
+ "run_time_phase1: %u seconds\n"
+ "run_time_phase2: %u seconds\n",
+ checked_phase1,
+ checked_phase2,
+ ns->ln_items_repaired,
+ ns->ln_objs_repaired_phase2,
+ ns->ln_items_failed,
+ ns->ln_objs_failed_phase2,
+ ns->ln_dirs_checked,
+ ns->ln_dirent_repaired,
+ ns->ln_linkea_repaired,
+ ns->ln_objs_nlink_repaired,
+ ns->ln_objs_lost_found,
+ ns->ln_mul_linked_checked,
+ ns->ln_mul_linked_repaired,
+ ns->ln_success_count,
+ time_phase1,
+ time_phase2);
+}
+
/* namespace APIs */
static int lfsck_namespace_reset(const struct lu_env *env,
ns->ln_items_repaired = 0;
ns->ln_items_failed = 0;
ns->ln_dirs_checked = 0;
- ns->ln_mlinked_checked = 0;
ns->ln_objs_checked_phase2 = 0;
ns->ln_objs_repaired_phase2 = 0;
ns->ln_objs_failed_phase2 = 0;
ns->ln_objs_nlink_repaired = 0;
ns->ln_objs_lost_found = 0;
+ ns->ln_dirent_repaired = 0;
+ ns->ln_linkea_repaired = 0;
+ ns->ln_mul_linked_checked = 0;
+ ns->ln_mul_linked_repaired = 0;
fid_zero(&ns->ln_fid_latest_scanned_phase2);
if (list_empty(&com->lc_link_dir))
list_add_tail(&com->lc_link_dir,
struct lfsck_component *com,
struct dt_object *obj)
{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lfsck_namespace *ns = com->lc_file_ram;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ const struct lu_fid *fid = lfsck_dto2fid(obj);
+ struct lu_attr *la = &info->lti_la;
+ struct lu_fid *pfid = &info->lti_fid2;
+ struct lu_name *cname = &info->lti_name;
+ struct lu_seq_range *range = &info->lti_range;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct seq_server_site *ss =
+ lu_site2seq(dev->dd_lu_dev.ld_site);
+ struct linkea_data ldata = { 0 };
+ __u32 idx = lfsck_dev_idx(dev);
+ int rc;
+ ENTRY;
+
+ rc = lfsck_links_read(env, obj, &ldata);
+ if (rc == -ENOENT)
+ GOTO(out, rc = 0);
+
+ /* -EINVAL means crashed linkEA, should be verified. */
+ if (rc == -EINVAL) {
+ rc = lfsck_namespace_trace_update(env, com, fid,
+ LNTF_CHECK_LINKEA, true);
+ if (rc == 0) {
+ struct lustre_handle lh = { 0 };
+
+ rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+ MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_XATTR, LCK_EX);
+ if (rc == 0) {
+ rc = lfsck_namespace_links_remove(env, com,
+ obj);
+ lfsck_ibits_unlock(&lh, LCK_EX);
+ }
+ }
+
+ GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
+ }
+
+ /* zero-linkEA object may be orphan, but it also maybe because
+ * of upgrading. Currently, we cannot record it for double scan.
+ * Because it may cause the LFSCK tracing file to be too large. */
+ if (rc == -ENODATA) {
+ if (S_ISDIR(lfsck_object_type(obj)))
+ GOTO(out, rc = 0);
+
+ rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (la->la_nlink > 1)
+ rc = lfsck_namespace_trace_update(env, com, fid,
+ LNTF_CHECK_LINKEA, true);
+
+ GOTO(out, rc);
+ }
+
+ if (rc != 0)
+ GOTO(out, rc);
+
+ /* Record multiple-linked object. */
+ if (ldata.ld_leh->leh_reccount > 1) {
+ rc = lfsck_namespace_trace_update(env, com, fid,
+ LNTF_CHECK_LINKEA, true);
+
+ GOTO(out, rc);
+ }
+
+ linkea_first_entry(&ldata);
+ linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
+ if (!fid_is_sane(pfid)) {
+ rc = lfsck_namespace_trace_update(env, com, fid,
+ LNTF_CHECK_PARENT, true);
+ } else {
+ fld_range_set_mdt(range);
+ rc = fld_local_lookup(env, ss->ss_server_fld,
+ fid_seq(pfid), range);
+ if ((rc == -ENOENT) ||
+ (rc == 0 && range->lsr_index != idx)) {
+ rc = lfsck_namespace_trace_update(env, com, fid,
+ LNTF_CHECK_LINKEA, true);
+ } else {
+ if (S_ISDIR(lfsck_object_type(obj)))
+ GOTO(out, rc = 0);
+
+ rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (la->la_nlink > 1)
+ rc = lfsck_namespace_trace_update(env, com,
+ fid, LNTF_CHECK_LINKEA, true);
+ }
+ }
+
+ GOTO(out, rc);
+
+out:
down_write(&com->lc_sem);
com->lc_new_checked++;
if (S_ISDIR(lfsck_object_type(obj)))
- ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
+ ns->ln_dirs_checked++;
+ if (rc != 0)
+ lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
up_write(&com->lc_sem);
- return 0;
+
+ return rc;
}
static int lfsck_namespace_exec_dir(const struct lu_env *env,
do_div(new_checked, duration);
if (rtime != 0)
do_div(speed, rtime);
- seq_printf(m, "checked_phase1: "LPU64"\n"
- "checked_phase2: "LPU64"\n"
- "updated_phase1: "LPU64"\n"
- "updated_phase2: "LPU64"\n"
- "failed_phase1: "LPU64"\n"
- "failed_phase2: "LPU64"\n"
- "directories: "LPU64"\n"
- "multi_linked_files: "LPU64"\n"
- "dirent_repaired: "LPU64"\n"
- "linkea_repaired: "LPU64"\n"
- "nlinks_repaired: "LPU64"\n"
- "lost_found: "LPU64"\n"
- "success_count: %u\n"
- "run_time_phase1: %u seconds\n"
- "run_time_phase2: %u seconds\n"
- "average_speed_phase1: "LPU64" items/sec\n"
+ lfsck_namespace_dump_statistics(m, ns, checked,
+ ns->ln_objs_checked_phase2,
+ rtime, ns->ln_run_time_phase2);
+
+ seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
"average_speed_phase2: N/A\n"
"real_time_speed_phase1: "LPU64" items/sec\n"
"real_time_speed_phase2: N/A\n",
- checked,
- ns->ln_objs_checked_phase2,
- ns->ln_items_repaired,
- ns->ln_objs_repaired_phase2,
- ns->ln_items_failed,
- ns->ln_objs_failed_phase2,
- ns->ln_dirs_checked,
- ns->ln_mlinked_checked,
- ns->ln_dirent_repaired,
- ns->ln_linkea_repaired,
- ns->ln_objs_nlink_repaired,
- ns->ln_objs_lost_found,
- ns->ln_success_count,
- rtime,
- ns->ln_run_time_phase2,
speed,
new_checked);
do_div(speed1, ns->ln_run_time_phase1);
if (rtime != 0)
do_div(speed2, rtime);
- seq_printf(m, "checked_phase1: "LPU64"\n"
- "checked_phase2: "LPU64"\n"
- "updated_phase1: "LPU64"\n"
- "updated_phase2: "LPU64"\n"
- "failed_phase1: "LPU64"\n"
- "failed_phase2: "LPU64"\n"
- "directories: "LPU64"\n"
- "multi_linked_files: "LPU64"\n"
- "dirent_repaired: "LPU64"\n"
- "linkea_repaired: "LPU64"\n"
- "nlinks_repaired: "LPU64"\n"
- "lost_found: "LPU64"\n"
- "success_count: %u\n"
- "run_time_phase1: %u seconds\n"
- "run_time_phase2: %u seconds\n"
- "average_speed_phase1: "LPU64" items/sec\n"
+ lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
+ checked,
+ ns->ln_run_time_phase1, rtime);
+
+ seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
"average_speed_phase2: "LPU64" objs/sec\n"
"real_time_speed_phase1: N/A\n"
"real_time_speed_phase2: "LPU64" objs/sec\n"
"current_position: "DFID"\n",
- ns->ln_items_checked,
- checked,
- ns->ln_items_repaired,
- ns->ln_objs_repaired_phase2,
- ns->ln_items_failed,
- ns->ln_objs_failed_phase2,
- ns->ln_dirs_checked,
- ns->ln_mlinked_checked,
- ns->ln_dirent_repaired,
- ns->ln_linkea_repaired,
- ns->ln_objs_nlink_repaired,
- ns->ln_objs_lost_found,
- ns->ln_success_count,
- ns->ln_run_time_phase1,
- rtime,
speed1,
speed2,
new_checked,
do_div(speed1, ns->ln_run_time_phase1);
if (ns->ln_run_time_phase2 != 0)
do_div(speed2, ns->ln_run_time_phase2);
- seq_printf(m, "checked_phase1: "LPU64"\n"
- "checked_phase2: "LPU64"\n"
- "updated_phase1: "LPU64"\n"
- "updated_phase2: "LPU64"\n"
- "failed_phase1: "LPU64"\n"
- "failed_phase2: "LPU64"\n"
- "directories: "LPU64"\n"
- "multi_linked_files: "LPU64"\n"
- "dirent_repaired: "LPU64"\n"
- "linkea_repaired: "LPU64"\n"
- "nlinks_repaired: "LPU64"\n"
- "lost_found: "LPU64"\n"
- "success_count: %u\n"
- "run_time_phase1: %u seconds\n"
- "run_time_phase2: %u seconds\n"
- "average_speed_phase1: "LPU64" items/sec\n"
+ lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
+ ns->ln_objs_checked_phase2,
+ ns->ln_run_time_phase1,
+ ns->ln_run_time_phase2);
+
+ seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
"average_speed_phase2: "LPU64" objs/sec\n"
"real_time_speed_phase1: N/A\n"
"real_time_speed_phase2: N/A\n"
"current_position: N/A\n",
- ns->ln_items_checked,
- ns->ln_objs_checked_phase2,
- ns->ln_items_repaired,
- ns->ln_objs_repaired_phase2,
- ns->ln_items_failed,
- ns->ln_objs_failed_phase2,
- ns->ln_dirs_checked,
- ns->ln_mlinked_checked,
- ns->ln_dirent_repaired,
- ns->ln_linkea_repaired,
- ns->ln_objs_nlink_repaired,
- ns->ln_objs_lost_found,
- ns->ln_success_count,
- ns->ln_run_time_phase1,
- ns->ln_run_time_phase2,
speed1,
speed2);
}
struct dt_object *dir = lnr->lnr_obj;
struct dt_object *obj = NULL;
const struct lu_fid *pfid = lfsck_dto2fid(dir);
+ struct dt_device *dev;
+ struct lustre_handle lh = { 0 };
bool repaired = false;
- bool locked = false;
+ bool dtlocked = false;
bool remove;
bool newdata;
bool log = false;
+ int idx;
int count = 0;
int rc;
ENTRY;
fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
GOTO(out, rc = 0);
- obj = lfsck_object_find(env, lfsck, &lnr->lnr_fid);
+ idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
+ if (idx < 0)
+ GOTO(out, rc = idx);
+
+ if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
+ dev = lfsck->li_next;
+ } else {
+ struct lfsck_tgt_desc *ltd;
+
+ ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
+ if (unlikely(ltd == NULL)) {
+ CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
+ "did not join the namespace LFSCK\n",
+ lfsck_lfsck2name(lfsck), idx);
+ ns->ln_flags |= LF_INCOMPLETE;
+
+ GOTO(out, rc = -ENODEV);
+ }
+
+ dev = ltd->ltd_tgt;
+ }
+
+ obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
if (IS_ERR(obj))
GOTO(out, rc = PTR_ERR(obj));
}
cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
- if (!(bk->lb_param & LPF_DRYRUN) &&
- (com->lc_journal || repaired)) {
+ if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
again:
- LASSERT(!locked);
+ rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+ MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_XATTR, LCK_EX);
+ if (rc != 0)
+ GOTO(out, rc);
- com->lc_journal = 1;
- handle = dt_trans_create(env, lfsck->li_next);
+ handle = dt_trans_create(env, dev);
if (IS_ERR(handle))
GOTO(out, rc = PTR_ERR(handle));
if (rc != 0)
GOTO(stop, rc);
- rc = dt_trans_start(env, lfsck->li_next, handle);
+ rc = dt_trans_start(env, dev, handle);
if (rc != 0)
GOTO(stop, rc);
- dt_write_lock(env, obj, MOR_TGT_CHILD);
- locked = true;
+ dt_write_lock(env, obj, 0);
+ dtlocked = true;
}
rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
nodata:
if (bk->lb_param & LPF_DRYRUN) {
+ down_write(&com->lc_sem);
ns->ln_linkea_repaired++;
+ up_write(&com->lc_sem);
repaired = true;
log = true;
goto record;
}
- if (!com->lc_journal)
+ if (!lustre_handle_is_used(&lh))
goto again;
if (remove) {
GOTO(stop, rc);
count = ldata.ld_leh->leh_reccount;
+ down_write(&com->lc_sem);
ns->ln_linkea_repaired++;
+ up_write(&com->lc_sem);
repaired = true;
log = true;
+ } else if (rc == -ENOENT) {
+ log = false;
+ repaired = false;
+
+ GOTO(stop, rc = 0);
} else {
GOTO(stop, rc);
}
LASSERT(dt_write_locked(env, obj));
dt_write_unlock(env, obj);
- locked = false;
+ dtlocked = false;
- dt_trans_stop(env, lfsck->li_next, handle);
+ dt_trans_stop(env, dev, handle);
handle = NULL;
+
+ lfsck_ibits_unlock(&lh, LCK_EX);
}
- ns->ln_mlinked_checked++;
- rc = lfsck_namespace_update(env, com, &lnr->lnr_fid,
- count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
+ down_write(&com->lc_sem);
+ ns->ln_mul_linked_checked++;
+ up_write(&com->lc_sem);
+ rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
+ LNTF_CHECK_LINKEA, true);
GOTO(out, rc);
stop:
- if (locked)
+ if (dtlocked)
dt_write_unlock(env, obj);
- if (handle != NULL)
- dt_trans_stop(env, lfsck->li_next, handle);
+ if (handle != NULL && !IS_ERR(handle))
+ dt_trans_stop(env, dev, handle);
out:
+ lfsck_ibits_unlock(&lh, LCK_EX);
down_write(&com->lc_sem);
if (rc < 0) {
CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
lfsck_pos_fill(env, lfsck,
&ns->ln_pos_first_inconsistent,
false);
- } else {
- com->lc_journal = 0;
}
rc = 0;
}
struct dt_key *key;
struct lu_fid fid;
int rc;
- __u8 flags = 0;
+ __u8 flags = 0;
ENTRY;
CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
l_wait_event(thread->t_ctl_waitq,
!thread_is_running(thread),
&lwi);
+
+ if (unlikely(!thread_is_running(thread)))
+ GOTO(put, rc = 0);
}
key = iops->key(env, di);
fid_be_to_cpu(&fid, (const struct lu_fid *)key);
+ if (!fid_is_sane(&fid)) {
+ rc = 0;
+ goto checkpoint;
+ }
+
target = lfsck_object_find(env, lfsck, &fid);
- down_write(&com->lc_sem);
if (IS_ERR(target)) {
rc = PTR_ERR(target);
goto checkpoint;
}
- /* XXX: Currently, skip remote object, the consistency for
- * remote object will be processed in LFSCK phase III. */
- if (dt_object_exists(target) && !dt_object_remote(target)) {
+ if (dt_object_exists(target)) {
rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
- if (rc == 0)
+ if (rc == 0) {
rc = lfsck_namespace_double_scan_one(env, com,
target, flags);
+ if (rc == -ENOENT)
+ rc = 0;
+ }
}
lfsck_object_put(env, target);
checkpoint:
+ down_write(&com->lc_sem);
com->lc_new_checked++;
com->lc_new_scanned++;
ns->ln_fid_latest_scanned_phase2 = fid;
ns->ln_objs_failed_phase2++;
up_write(&com->lc_sem);
- if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
- lfsck_namespace_delete(env, com, &fid);
- } else if (rc < 0) {
- flags |= LLF_REPAIR_FAILED;
- lfsck_namespace_update(env, com, &fid, flags, true);
- }
-
if (rc < 0 && bk->lb_param & LPF_FAILOUT)
GOTO(put, rc);
fini:
iops->fini(env, di);
+
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n",
+ lfsck_lfsck2name(lfsck), rc);
+
return rc;
}
com->lc_new_checked = 0;
if (rc > 0) {
- com->lc_journal = 0;
if (ns->ln_flags & LF_INCOMPLETE)
ns->ln_status = LS_PARTIAL;
else
rc = dt_declare_xattr_set(env, obj, &linkea_buf,
XATTR_NAME_LINK, fl, th);
if (rc != 0)
- GOTO(stop, rc = PTR_ERR(th));
+ GOTO(stop, rc);
rc = dt_trans_start_local(env, dev, th);
if (rc != 0)