+ dt_write_lock(env, child, 0);
+ /* 1b.1. create child */
+ rc = dt_create(env, child, la, NULL, dof, th);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ if (unlikely(!dt_try_as_dir(env, child)))
+ GOTO(unlock, rc = -ENOTDIR);
+
+ /* 1b.2. insert dot into child dir */
+ rec->rec_fid = cfid;
+ rc = dt_insert(env, child, (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th, 1);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ /* 1b.3. insert dotdot into child dir */
+ rec->rec_fid = &LU_LPF_FID;
+ rc = dt_insert(env, child, (const struct dt_rec *)rec,
+ (const struct dt_key *)dotdot, th, 1);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ /* 2b. increase child nlink */
+ rc = dt_ref_add(env, child, th);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ /* 3b. insert linkEA for child. */
+ rc = dt_xattr_set(env, child, &linkea_buf,
+ XATTR_NAME_LINK, 0, th);
+ dt_write_unlock(env, child);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ /* 4b. insert name into parent dir */
+ rec->rec_fid = cfid;
+ rc = dt_insert(env, parent, (const struct dt_rec *)rec,
+ (const struct dt_key *)name, th, 1);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, parent, 0);
+ /* 5b. increase parent nlink */
+ rc = dt_ref_add(env, parent, th);
+ dt_write_unlock(env, parent);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ bk->lb_lpf_fid = *cfid;
+ lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
+
+ /* 6b. update bookmark */
+ rc = dt_record_write(env, bk_obj,
+ lfsck_buf_get(env, bk, len), &pos, th);
+
+ GOTO(stop, rc);
+
+unlock:
+ dt_write_unlock(env, child);
+
+stop:
+ dt_trans_stop(env, dev, th);
+
+ return rc;
+}
+
+static int lfsck_create_lpf_remote(const struct lu_env *env,
+ struct lfsck_instance *lfsck,
+ struct dt_object *child,
+ struct lu_attr *la,
+ struct dt_object_format *dof,
+ const char *name)
+{
+ struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec;
+ struct dt_object *parent = lfsck->li_lpf_root_obj;
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct dt_object *bk_obj = lfsck->li_bookmark_obj;
+ const struct lu_fid *cfid = lfsck_dto2fid(child);
+ struct thandle *th = NULL;
+ struct linkea_data ldata = { NULL };
+ struct lu_buf linkea_buf;
+ const struct lu_name *cname;
+ struct dt_device *dev;
+ loff_t pos = 0;
+ int len = sizeof(struct lfsck_bookmark);
+ int rc;
+ ENTRY;
+
+ rc = linkea_data_new(&ldata,
+ &lfsck_env_info(env)->lti_linkea_buf2);
+ if (rc != 0)
+ RETURN(rc);
+
+ cname = lfsck_name_get_const(env, name, strlen(name));
+ rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+ if (rc != 0)
+ RETURN(rc);
+
+ /* Create .lustre/lost+found/MDTxxxx. */
+
+ /* XXX: Currently, cross-MDT create operation needs to create the child
+ * object firstly, then insert name into the parent directory. For
+ * this case, the child object resides on current MDT (local), but
+ * the parent ".lustre/lost+found" may be on remote MDT. It is not
+ * easy to contain all the sub-modifications orderly within single
+ * transaction.
+ *
+ * To avoid more inconsistency, we split the create operation into
+ * two transactions:
+ *
+ * 1) create the child and update the lfsck_bookmark::lb_lpf_fid
+ * locally.
+ * 2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
+ * remotely.
+ *
+ * If 1) done, but 2) failed, then go ahead, the LFSCK will try to
+ * repair such inconsistency when LFSCK run next time. */
+
+ /* Transaction I: locally */
+
+ dev = lfsck_obj2dev(child);
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ /* 1a. create child */
+ rc = dt_declare_create(env, child, la, NULL, dof, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ /* 2a. increase child nlink */
+ rc = dt_declare_ref_add(env, child, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ /* 3a. insert linkEA for child */
+ lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+ ldata.ld_leh->leh_len);
+ rc = dt_declare_xattr_set(env, child, &linkea_buf,
+ XATTR_NAME_LINK, 0, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ /* 4a. update bookmark */
+ rc = dt_declare_record_write(env, bk_obj,
+ lfsck_buf_get(env, bk, len), 0, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, child, 0);
+ /* 1b.1. create child */
+ rc = dt_create(env, child, la, NULL, dof, th);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ if (unlikely(!dt_try_as_dir(env, child)))
+ GOTO(unlock, rc = -ENOTDIR);
+
+ /* 1b.2. insert dot into child dir */
+ rec->rec_type = S_IFDIR;
+ rec->rec_fid = cfid;
+ rc = dt_insert(env, child, (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th, 1);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ /* 1b.3. insert dotdot into child dir */
+ rec->rec_fid = &LU_LPF_FID;
+ rc = dt_insert(env, child, (const struct dt_rec *)rec,
+ (const struct dt_key *)dotdot, th, 1);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ /* 2b. increase child nlink */
+ rc = dt_ref_add(env, child, th);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ /* 3b. insert linkEA for child */
+ rc = dt_xattr_set(env, child, &linkea_buf,
+ XATTR_NAME_LINK, 0, th);
+ if (rc != 0)
+ GOTO(unlock, rc);
+
+ bk->lb_lpf_fid = *cfid;
+ lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
+
+ /* 4b. update bookmark */
+ rc = dt_record_write(env, bk_obj,
+ lfsck_buf_get(env, bk, len), &pos, th);
+
+ dt_write_unlock(env, child);
+ dt_trans_stop(env, dev, th);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* Transaction II: remotely */
+
+ dev = lfsck_obj2dev(parent);
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ th->th_sync = 1;
+ /* 5a. insert name into parent dir */
+ rec->rec_fid = cfid;
+ rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
+ (const struct dt_key *)name, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ /* 6a. increase parent nlink */
+ rc = dt_declare_ref_add(env, parent, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ /* 5b. insert name into parent dir */
+ rc = dt_insert(env, parent, (const struct dt_rec *)rec,
+ (const struct dt_key *)name, th, 1);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, parent, 0);
+ /* 6b. increase parent nlink */
+ rc = dt_ref_add(env, parent, th);
+ dt_write_unlock(env, parent);
+
+ GOTO(stop, rc);
+
+unlock:
+ dt_write_unlock(env, child);
+stop:
+ dt_trans_stop(env, dev, th);
+
+ if (rc != 0 && dev == lfsck_obj2dev(parent))
+ CDEBUG(D_LFSCK, "%s: partially created the object "DFID
+ "for orphans, but failed to insert the name %s "
+ "to the .lustre/lost+found/. Such inconsistency "
+ "will be repaired when LFSCK run next time: rc = %d\n",
+ lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
+
+ return rc;
+}
+
+/**
+ * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
+ *
+ * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
+ * orphans and other uncertain inconsistent objects found during the
+ * LFSCK. Such directory will be created by the LFSCK engine on the
+ * local MDT before the LFSCK scanning.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] lfsck pointer to the lfsck instance
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+static int lfsck_create_lpf(const struct lu_env *env,
+ struct lfsck_instance *lfsck)
+{
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_fid *cfid = &info->lti_fid2;
+ struct lu_attr *la = &info->lti_la;
+ struct dt_object_format *dof = &info->lti_dof;
+ struct dt_object *parent = lfsck->li_lpf_root_obj;
+ struct dt_object *child = NULL;
+ struct lfsck_lock_handle *llh = &info->lti_llh;
+ char name[8];
+ int node = lfsck_dev_idx(lfsck);
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(lfsck->li_master);
+ LASSERT(parent != NULL);
+ LASSERT(lfsck->li_lpf_obj == NULL);
+
+ rc = lfsck_lock(env, lfsck, parent, name, llh,
+ MDS_INODELOCK_UPDATE, LCK_PW);
+ if (rc != 0)
+ RETURN(rc);
+
+ snprintf(name, 8, "MDT%04x", node);
+ if (fid_is_zero(&bk->lb_lpf_fid)) {
+ /* There is corner case that: in former LFSCK scanning we have
+ * created the .lustre/lost+found/MDTxxxx but failed to update
+ * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
+ * it from MDT0 firstly. */
+ rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
+ (const struct dt_key *)name);
+ if (rc != 0 && rc != -ENOENT)
+ GOTO(unlock, rc);
+
+ if (rc == 0) {
+ bk->lb_lpf_fid = *cfid;
+ rc = lfsck_bookmark_store(env, lfsck);
+ } else {
+ rc = lfsck_fid_alloc(env, lfsck, cfid, true);
+ }
+ if (rc != 0)
+ GOTO(unlock, rc);
+ } else {
+ *cfid = bk->lb_lpf_fid;
+ }
+
+ child = lfsck_object_find_bottom(env, lfsck, cfid);
+ if (IS_ERR(child))
+ GOTO(unlock, rc = PTR_ERR(child));
+
+ if (dt_object_exists(child) != 0) {
+ if (unlikely(!dt_try_as_dir(env, child)))
+ rc = -ENOTDIR;
+ else
+ lfsck->li_lpf_obj = child;
+
+ GOTO(unlock, rc);
+ }
+
+ memset(la, 0, sizeof(*la));
+ la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
+ la->la_mode = S_IFDIR | S_IRWXU;
+ la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
+ LA_UID | LA_GID;
+ memset(dof, 0, sizeof(*dof));
+ dof->dof_type = dt_mode_to_dft(S_IFDIR);
+
+ if (node == 0)
+ rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
+ else
+ rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
+ if (rc == 0)
+ lfsck->li_lpf_obj = child;
+
+ GOTO(unlock, rc);
+
+unlock:
+ lfsck_unlock(llh);
+ if (rc != 0 && child != NULL && !IS_ERR(child))
+ lfsck_object_put(env, child);
+
+ return rc;
+}
+
+/**
+ * Scan .lustre/lost+found for bad name entries and remove them.
+ *
+ * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
+ * index in the system. Any other formatted name is invalid and should be
+ * removed.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] lfsck pointer to the lfsck instance
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
+ struct lfsck_instance *lfsck)
+{
+ struct dt_object *parent = lfsck->li_lpf_root_obj;
+ struct lu_dirent *ent =
+ (struct lu_dirent *)lfsck_env_info(env)->lti_key;
+ const struct dt_it_ops *iops = &parent->do_index_ops->dio_it;
+ struct dt_it *it;
+ int rc;
+ ENTRY;
+
+ it = iops->init(env, parent, LUDA_64BITHASH);
+ if (IS_ERR(it))
+ RETURN(PTR_ERR(it));
+
+ rc = iops->load(env, it, 0);
+ if (rc == 0)
+ rc = iops->next(env, it);
+ else if (rc > 0)
+ rc = 0;
+
+ while (rc == 0) {
+ int off = 3;
+
+ rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
+ if (rc != 0)
+ break;
+
+ ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
+ if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
+ goto next;
+
+ /* name length must be strlen("MDTxxxx") */
+ if (ent->lde_namelen != 7)
+ goto remove;
+
+ if (memcmp(ent->lde_name, "MDT", off) != 0)
+ goto remove;
+
+ while (off < 7 && isxdigit(ent->lde_name[off]))
+ off++;
+
+ if (off != 7) {
+
+remove:
+ rc = lfsck_lpf_remove_name_entry(env, lfsck,
+ ent->lde_name);
+ if (rc != 0)
+ break;
+ }
+
+next:
+ rc = iops->next(env, it);
+ }
+
+ iops->put(env, it);
+ iops->fini(env, it);
+
+ RETURN(rc > 0 ? 0 : rc);
+}
+
+static int lfsck_update_lpf_entry(const struct lu_env *env,
+ struct lfsck_instance *lfsck,
+ struct dt_object *parent,
+ struct dt_object *child,
+ const char *name,
+ enum lfsck_verify_lpf_types type)
+{
+ int rc;
+
+ if (type == LVLT_BY_BOOKMARK) {
+ rc = lfsck_update_name_entry(env, lfsck, parent, name,
+ lfsck_dto2fid(child), S_IFDIR);
+ } else /* if (type == LVLT_BY_NAMEENTRY) */ {
+ lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
+ rc = lfsck_bookmark_store(env, lfsck);
+
+ CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
+ " in the bookmark file: rc = %d\n",
+ lfsck_lfsck2name(lfsck),
+ PFID(lfsck_dto2fid(child)), rc);
+ }
+
+ return rc;
+}
+
+/**
+ * Check whether the @child back references the @parent.
+ *
+ * Two cases:
+ * 1) The child's FID is stored in the bookmark file. If the child back
+ * references the parent (LU_LPF_FID object) via its ".." entry, then
+ * insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
+ * the child back references another parent2, then:
+ * 1.1) If the parent2 recognizes the child, then update the bookmark file;
+ * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
+ * references the child. So keep them there. As the LFSCK processing,
+ * the parent3 may be found, then when the LFSCK run next time, the
+ * inconsistency can be repaired.
+ *
+ * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
+ * entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
+ * via its ".." entry, then update the bookmark file, otherwise, if the child
+ * back references another parent2, then:
+ * 2.1) If the parent2 recognizes the child, then remove the sub-directory
+ * from .lustre/lost+found/;
+ * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
+ * sub-directory name entry and update the child;
+ * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
+ * or not, then keep them there.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] lfsck pointer to the lfsck instance
+ * \param[in] child pointer to the lost+found sub-directory object
+ * \param[in] name the name for lost+found sub-directory object
+ * \param[out] fid pointer to the buffer to hold the FID of the object
+ * (called it as parent2) that is referenced via the
+ * child's dotdot entry; it also can be the FID that
+ * is referenced by the name entry under the parent2.
+ * \param[in] type to indicate where the child's FID is stored in
+ *
+ * \retval positive number for uncertain inconsistency
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+static int lfsck_verify_lpf_pairs(const struct lu_env *env,
+ struct lfsck_instance *lfsck,
+ struct dt_object *child, const char *name,
+ struct lu_fid *fid,
+ enum lfsck_verify_lpf_types type)
+{
+ struct dt_object *parent = lfsck->li_lpf_root_obj;
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ char *name2 = info->lti_key;
+ struct lu_fid *fid2 = &info->lti_fid3;
+ struct dt_object *parent2 = NULL;
+ struct lustre_handle lh = { 0 };
+ int rc;
+ ENTRY;
+
+ fid_zero(fid);
+ rc = dt_lookup(env, child, (struct dt_rec *)fid,
+ (const struct dt_key *)dotdot);
+ if (rc != 0)
+ GOTO(linkea, rc);
+
+ if (!fid_is_sane(fid))
+ GOTO(linkea, rc = -EINVAL);
+
+ if (lu_fid_eq(fid, &LU_LPF_FID)) {
+ const struct lu_name *cname;
+
+ if (lfsck->li_lpf_obj == NULL) {
+ lu_object_get(&child->do_lu);
+ lfsck->li_lpf_obj = child;
+ }
+
+ cname = lfsck_name_get_const(env, name, strlen(name));
+ rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID);
+ if (rc == 0)
+ rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
+ name, type);
+
+ GOTO(out_done, rc);
+ }
+
+ parent2 = lfsck_object_find_bottom(env, lfsck, fid);
+ if (IS_ERR(parent2))
+ GOTO(linkea, parent2);
+
+ if (!dt_object_exists(parent2)) {
+ lfsck_object_put(env, parent2);
+
+ GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
+ }
+
+ if (!dt_try_as_dir(env, parent2)) {
+ lfsck_object_put(env, parent2);
+
+ GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
+ }
+
+linkea:
+ /* To prevent rename/unlink race */
+ rc = lfsck_ibits_lock(env, lfsck, child, &lh,
+ MDS_INODELOCK_UPDATE, LCK_PR);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ dt_read_lock(env, child, 0);
+ rc = lfsck_links_get_first(env, child, name2, fid2);
+ if (rc != 0) {
+ dt_read_unlock(env, child);
+ lfsck_ibits_unlock(&lh, LCK_PR);
+
+ GOTO(out_put, rc = 1);
+ }
+
+ /* It is almost impossible that the bookmark file (or the name entry)
+ * and the linkEA hit the same data corruption. Trust the linkEA. */
+ if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
+ dt_read_unlock(env, child);
+ lfsck_ibits_unlock(&lh, LCK_PR);
+
+ *fid = *fid2;
+ if (lfsck->li_lpf_obj == NULL) {
+ lu_object_get(&child->do_lu);
+ lfsck->li_lpf_obj = child;
+ }
+
+ /* Update the child's dotdot entry */
+ rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
+ &LU_LPF_FID, S_IFDIR);
+ if (rc == 0)
+ rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
+ name, type);
+
+ GOTO(out_put, rc);
+ }
+
+ if (parent2 == NULL || IS_ERR(parent2)) {
+ dt_read_unlock(env, child);
+ lfsck_ibits_unlock(&lh, LCK_PR);
+
+ GOTO(out_done, rc = 1);
+ }
+
+ rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
+ (const struct dt_key *)name2);
+ dt_read_unlock(env, child);
+ lfsck_ibits_unlock(&lh, LCK_PR);
+ if (rc != 0 && rc != -ENOENT)
+ GOTO(out_put, rc);
+
+ if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
+ if (type == LVLT_BY_BOOKMARK)
+ GOTO(out_put, rc = 1);
+
+ /* Trust the name entry, update the child's dotdot entry. */
+ rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
+ &LU_LPF_FID, S_IFDIR);
+
+ GOTO(out_put, rc);
+ }
+
+ if (type == LVLT_BY_BOOKMARK) {
+ /* Invalid FID record in the bookmark file, reset it. */
+ fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
+ rc = lfsck_bookmark_store(env, lfsck);
+
+ CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
+ " in the bookmark file: rc = %d\n",
+ lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
+ } else /* if (type == LVLT_BY_NAMEENTRY) */ {
+ /* The name entry is wrong, remove it. */
+ rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
+ }
+
+ GOTO(out_put, rc);
+
+out_put:
+ if (parent2 != NULL && !IS_ERR(parent2))
+ lfsck_object_put(env, parent2);
+
+out_done:
+ return rc;
+}
+
+/**
+ * Verify the /ROOT/.lustre/lost+found/ directory.
+ *
+ * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
+ * the LFSCK does not exactly know how to handle, such as orphans. So before
+ * the LFSCK scanning the system, the consistency of such directory needs to
+ * be verified firstly to allow the users to use it during the LFSCK.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] lfsck pointer to the lfsck instance
+ *
+ * \retval positive number for uncertain inconsistency
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_fid *pfid = &info->lti_fid;
+ struct lu_fid *cfid = &info->lti_fid2;
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct dt_object *parent;
+ /* child1's FID is in the bookmark file. */
+ struct dt_object *child1 = NULL;
+ /* child2's FID is in the name entry MDTxxxx. */
+ struct dt_object *child2 = NULL;
+ const struct lu_name *cname;
+ char name[8];
+ int node = lfsck_dev_idx(lfsck);
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(lfsck->li_master);
+
+ if (lfsck->li_lpf_root_obj != NULL)
+ RETURN(0);
+
+ if (node == 0) {
+ parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
+ &LU_LPF_FID);
+ } else {
+ struct lfsck_tgt_desc *ltd;
+
+ ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
+ if (unlikely(ltd == NULL))
+ RETURN(-ENXIO);
+
+ parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
+ &LU_LPF_FID);
+ lfsck_tgt_put(ltd);
+ }
+
+ if (IS_ERR(parent))
+ RETURN(PTR_ERR(parent));
+
+ LASSERT(dt_object_exists(parent));
+
+ if (unlikely(!dt_try_as_dir(env, parent))) {
+ lfsck_object_put(env, parent);
+
+ GOTO(put, rc = -ENOTDIR);
+ }
+
+ lfsck->li_lpf_root_obj = parent;
+ if (node == 0) {
+ rc = lfsck_scan_lpf_bad_entries(env, lfsck);
+ if (rc != 0)
+ CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
+ "for bad sub-directories: rc = %d\n",
+ lfsck_lfsck2name(lfsck), rc);
+ }
+
+ if (!fid_is_zero(&bk->lb_lpf_fid)) {
+ if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
+ struct lu_fid tfid = bk->lb_lpf_fid;
+
+ /* Invalid FID record in the bookmark file, reset it. */
+ fid_zero(&bk->lb_lpf_fid);
+ rc = lfsck_bookmark_store(env, lfsck);
+
+ CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
+ " in the bookmark file: rc = %d\n",
+ lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
+
+ if (rc != 0)
+ GOTO(put, rc);
+ } else {
+ child1 = lfsck_object_find_bottom(env, lfsck,
+ &bk->lb_lpf_fid);
+ if (IS_ERR(child1)) {
+ child1 = NULL;
+ goto find_child2;
+ }
+
+ if (unlikely(!dt_object_exists(child1) ||
+ dt_object_remote(child1)) ||
+ !S_ISDIR(lfsck_object_type(child1))) {
+ /* Invalid FID record in the bookmark file,
+ * reset it. */
+ fid_zero(&bk->lb_lpf_fid);
+ rc = lfsck_bookmark_store(env, lfsck);
+
+ CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
+ " in the bookmark file: rc = %d\n",
+ lfsck_lfsck2name(lfsck),
+ PFID(lfsck_dto2fid(child1)), rc);
+
+ if (rc != 0)
+ GOTO(put, rc);
+
+ lfsck_object_put(env, child1);
+ child1 = NULL;
+ } else if (unlikely(!dt_try_as_dir(env, child1))) {
+ GOTO(put, rc = -ENOTDIR);
+ }
+ }
+ }
+
+find_child2:
+ snprintf(name, 8, "MDT%04x", node);
+ rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
+ (const struct dt_key *)name);
+ if (rc == -ENOENT) {
+ if (!fid_is_zero(&bk->lb_lpf_fid))
+ goto check_child1;
+
+ GOTO(put, rc = 0);
+ }
+
+ if (rc != 0)
+ GOTO(put, rc);
+
+ /* Invalid FID in the name entry, remove the name entry. */
+ if (!fid_is_norm(cfid)) {
+ rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
+ if (rc != 0)
+ GOTO(put, rc);
+
+ goto check_child1;
+ }
+
+ child2 = lfsck_object_find_bottom(env, lfsck, cfid);
+ if (IS_ERR(child2))
+ GOTO(put, rc = PTR_ERR(child2));
+
+ if (unlikely(!dt_object_exists(child2) ||
+ dt_object_remote(child2)) ||
+ !S_ISDIR(lfsck_object_type(child2))) {
+ rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
+ if (rc != 0)
+ GOTO(put, rc);
+
+ goto check_child1;
+ }
+
+ if (unlikely(!dt_try_as_dir(env, child2)))
+ GOTO(put, rc = -ENOTDIR);
+
+ if (child1 == NULL) {
+ rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
+ pfid, LVLT_BY_NAMEENTRY);
+ } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
+ rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
+ pfid, LVLT_BY_BOOKMARK);
+ if (!lu_fid_eq(pfid, &LU_LPF_FID))
+ rc = lfsck_verify_lpf_pairs(env, lfsck, child2,
+ name, pfid,
+ LVLT_BY_NAMEENTRY);
+ } else {
+ if (lfsck->li_lpf_obj == NULL) {
+ lu_object_get(&child2->do_lu);
+ lfsck->li_lpf_obj = child2;
+ }
+
+ cname = lfsck_name_get_const(env, name, strlen(name));
+ rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID);
+ }
+
+ GOTO(put, rc);
+
+check_child1:
+ if (child1 != NULL)
+ rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
+ pfid, LVLT_BY_BOOKMARK);
+
+ GOTO(put, rc);
+
+put:
+ if (lfsck->li_lpf_obj != NULL) {
+ if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
+ lfsck_object_put(env, lfsck->li_lpf_obj);
+ lfsck->li_lpf_obj = NULL;
+ rc = -ENOTDIR;
+ }
+ } else if (rc == 0) {
+ rc = lfsck_create_lpf(env, lfsck);
+ }
+
+ if (child2 != NULL && !IS_ERR(child2))
+ lfsck_object_put(env, child2);
+ if (child1 != NULL && !IS_ERR(child1))
+ lfsck_object_put(env, child1);
+
+ return rc;
+}
+
+static int lfsck_fid_init(struct lfsck_instance *lfsck)
+{
+ struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+ struct seq_server_site *ss = lfsck_dev_site(lfsck);
+ char *prefix;
+ int rc = 0;
+ ENTRY;
+
+ if (unlikely(ss == NULL))
+ RETURN(-ENXIO);
+
+ OBD_ALLOC_PTR(lfsck->li_seq);
+ if (lfsck->li_seq == NULL)
+ RETURN(-ENOMEM);
+
+ OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
+ if (prefix == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
+ rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
+ ss->ss_server_seq);
+ OBD_FREE(prefix, MAX_OBD_NAME + 7);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (fid_is_sane(&bk->lb_last_fid))
+ lfsck->li_seq->lcs_fid = bk->lb_last_fid;
+
+ RETURN(0);
+
+out:
+ OBD_FREE_PTR(lfsck->li_seq);
+ lfsck->li_seq = NULL;
+
+ return rc;
+}
+
+static void lfsck_fid_fini(struct lfsck_instance *lfsck)
+{
+ if (lfsck->li_seq != NULL) {
+ seq_client_fini(lfsck->li_seq);
+ OBD_FREE_PTR(lfsck->li_seq);
+ lfsck->li_seq = NULL;
+ }
+}
+
+void lfsck_instance_cleanup(const struct lu_env *env,
+ struct lfsck_instance *lfsck)
+{
+ struct ptlrpc_thread *thread = &lfsck->li_thread;
+ struct lfsck_component *com;
+ struct lfsck_component *next;
+ struct lfsck_lmv_unit *llu;
+ struct lfsck_lmv_unit *llu_next;
+ struct lfsck_lmv *llmv;
+ ENTRY;
+
+ LASSERT(list_empty(&lfsck->li_link));
+ LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
+
+ if (lfsck->li_obj_oit != NULL) {
+ lfsck_object_put(env, lfsck->li_obj_oit);
+ lfsck->li_obj_oit = NULL;
+ }
+
+ LASSERT(lfsck->li_obj_dir == NULL);
+ LASSERT(lfsck->li_lmv == NULL);
+
+ list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
+ llmv = &llu->llu_lmv;
+
+ LASSERTF(atomic_read(&llmv->ll_ref) == 1,
+ "still in using: %u\n",
+ atomic_read(&llmv->ll_ref));
+
+ lfsck_lmv_put(env, llmv);
+ }
+
+ list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
+ lfsck_component_cleanup(env, com);
+ }
+
+ LASSERT(list_empty(&lfsck->li_list_dir));
+
+ list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
+ lc_link) {
+ lfsck_component_cleanup(env, com);
+ }
+
+ list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
+ lfsck_component_cleanup(env, com);
+ }
+
+ lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
+ lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
+
+ if (lfsck->li_lfsck_dir != NULL) {
+ lfsck_object_put(env, lfsck->li_lfsck_dir);
+ lfsck->li_lfsck_dir = NULL;
+ }
+
+ if (lfsck->li_bookmark_obj != NULL) {
+ lfsck_object_put(env, lfsck->li_bookmark_obj);
+ lfsck->li_bookmark_obj = NULL;
+ }
+
+ if (lfsck->li_lpf_obj != NULL) {
+ lfsck_object_put(env, lfsck->li_lpf_obj);
+ lfsck->li_lpf_obj = NULL;
+ }
+
+ if (lfsck->li_lpf_root_obj != NULL) {
+ lfsck_object_put(env, lfsck->li_lpf_root_obj);
+ lfsck->li_lpf_root_obj = NULL;
+ }
+
+ if (lfsck->li_los != NULL) {
+ local_oid_storage_fini(env, lfsck->li_los);
+ lfsck->li_los = NULL;
+ }
+
+ lfsck_fid_fini(lfsck);
+
+ OBD_FREE_PTR(lfsck);
+}
+
+static inline struct lfsck_instance *
+__lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
+{
+ struct lfsck_instance *lfsck;
+
+ list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
+ if (lfsck->li_bottom == key) {
+ if (ref)
+ lfsck_instance_get(lfsck);
+ if (unlink)
+ list_del_init(&lfsck->li_link);
+
+ return lfsck;
+ }
+ }
+
+ return NULL;
+}
+
+struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
+ bool unlink)
+{
+ struct lfsck_instance *lfsck;
+
+ spin_lock(&lfsck_instance_lock);
+ lfsck = __lfsck_instance_find(key, ref, unlink);
+ spin_unlock(&lfsck_instance_lock);
+
+ return lfsck;
+}
+
+static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
+{
+ struct lfsck_instance *tmp;
+
+ spin_lock(&lfsck_instance_lock);
+ list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
+ if (lfsck->li_bottom == tmp->li_bottom) {
+ spin_unlock(&lfsck_instance_lock);
+ return -EEXIST;
+ }
+ }
+
+ list_add_tail(&lfsck->li_link, &lfsck_instance_list);
+ spin_unlock(&lfsck_instance_lock);
+ return 0;
+}
+
+int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
+ const char *prefix)
+{
+ int flag;
+ int i;
+ bool newline = (bits != 0 ? false : true);
+ int rc;
+
+ rc = seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
+ if (rc < 0)
+ return rc;
+
+ for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
+ if (flag & bits) {
+ bits &= ~flag;
+ if (names[i] != NULL) {
+ if (bits == 0)
+ newline = true;
+
+ rc = seq_printf(m, "%s%c", names[i],
+ newline ? '\n' : ',');
+ if (rc < 0)
+ return rc;
+ }
+ }
+ }
+
+ if (!newline)
+ rc = seq_printf(m, "\n");
+
+ return rc;
+}
+
+int lfsck_time_dump(struct seq_file *m, __u64 time, const char *name)
+{
+ int rc;
+
+ if (time == 0) {
+ rc = seq_printf(m, "%s_time: N/A\n", name);
+ if (rc == 0)
+ rc = seq_printf(m, "time_since_%s: N/A\n", name);
+
+ return rc;
+ }
+
+ rc = seq_printf(m, "%s_time: "LPU64"\n", name, time);
+ if (rc == 0)
+ rc = seq_printf(m, "time_since_%s: "LPU64" seconds\n",
+ name, cfs_time_current_sec() - time);
+
+ return rc;
+}
+
+int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
+ const char *prefix)
+{
+ if (fid_is_zero(&pos->lp_dir_parent)) {
+ if (pos->lp_oit_cookie == 0)
+ return seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
+
+ return seq_printf(m, "%s: "LPU64", N/A, N/A\n",
+ prefix, pos->lp_oit_cookie);
+ }
+
+ return seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
+ prefix, pos->lp_oit_cookie,
+ PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
+}
+
+void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
+ struct lfsck_position *pos, bool init)
+{
+ const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
+
+ if (unlikely(lfsck->li_di_oit == NULL)) {
+ memset(pos, 0, sizeof(*pos));
+ return;
+ }
+
+ pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
+ if (!lfsck->li_current_oit_processed && !init)
+ pos->lp_oit_cookie--;
+
+ LASSERT(pos->lp_oit_cookie > 0);
+
+ if (lfsck->li_di_dir != NULL) {
+ struct dt_object *dto = lfsck->li_obj_dir;
+
+ pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
+ lfsck->li_di_dir);
+
+ if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
+ fid_zero(&pos->lp_dir_parent);
+ pos->lp_dir_cookie = 0;
+ } else {
+ pos->lp_dir_parent = *lfsck_dto2fid(dto);
+ }
+ } else {
+ fid_zero(&pos->lp_dir_parent);
+ pos->lp_dir_cookie = 0;
+ }
+}
+
+bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
+{
+ bool dirty = false;
+
+ if (limit != LFSCK_SPEED_NO_LIMIT) {
+ if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
+ lfsck->li_sleep_rate = limit /
+ msecs_to_jiffies(MSEC_PER_SEC);
+ lfsck->li_sleep_jif = 1;
+ } else {
+ lfsck->li_sleep_rate = 1;
+ lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
+ limit;
+ }
+ } else {
+ lfsck->li_sleep_jif = 0;
+ lfsck->li_sleep_rate = 0;
+ }
+
+ if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
+ lfsck->li_bookmark_ram.lb_speed_limit = limit;
+ dirty = true;
+ }
+
+ return dirty;
+}
+
+void lfsck_control_speed(struct lfsck_instance *lfsck)
+{
+ struct ptlrpc_thread *thread = &lfsck->li_thread;
+ struct l_wait_info lwi;
+
+ if (lfsck->li_sleep_jif > 0 &&
+ lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
+ lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
+ LWI_ON_SIGNAL_NOOP, NULL);
+
+ l_wait_event(thread->t_ctl_waitq,
+ !thread_is_running(thread),
+ &lwi);
+ lfsck->li_new_scanned = 0;
+ }
+}
+
+void lfsck_control_speed_by_self(struct lfsck_component *com)
+{
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct ptlrpc_thread *thread = &lfsck->li_thread;
+ struct l_wait_info lwi;
+
+ if (lfsck->li_sleep_jif > 0 &&
+ com->lc_new_scanned >= lfsck->li_sleep_rate) {
+ lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
+ LWI_ON_SIGNAL_NOOP, NULL);
+
+ l_wait_event(thread->t_ctl_waitq,
+ !thread_is_running(thread),
+ &lwi);
+ com->lc_new_scanned = 0;
+ }
+}
+
+static struct lfsck_thread_args *
+lfsck_thread_args_init(struct lfsck_instance *lfsck,
+ struct lfsck_component *com,
+ struct lfsck_start_param *lsp)
+{
+ struct lfsck_thread_args *lta;
+ int rc;
+
+ OBD_ALLOC_PTR(lta);
+ if (lta == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ rc = lu_env_init(<a->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
+ if (rc != 0) {
+ OBD_FREE_PTR(lta);
+ return ERR_PTR(rc);
+ }
+
+ lta->lta_lfsck = lfsck_instance_get(lfsck);
+ if (com != NULL)
+ lta->lta_com = lfsck_component_get(com);
+
+ lta->lta_lsp = lsp;
+
+ return lta;
+}
+
+void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
+{
+ if (lta->lta_com != NULL)
+ lfsck_component_put(<a->lta_env, lta->lta_com);
+ lfsck_instance_put(<a->lta_env, lta->lta_lfsck);
+ lu_env_fini(<a->lta_env);
+ OBD_FREE_PTR(lta);
+}
+
+struct lfsck_assistant_data *
+lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
+ const char *name)
+{
+ struct lfsck_assistant_data *lad;
+
+ OBD_ALLOC_PTR(lad);
+ if (lad != NULL) {
+ lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
+ if (lad->lad_bitmap == NULL) {
+ OBD_FREE_PTR(lad);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&lad->lad_req_list);
+ spin_lock_init(&lad->lad_lock);
+ INIT_LIST_HEAD(&lad->lad_ost_list);
+ INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
+ INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
+ INIT_LIST_HEAD(&lad->lad_mdt_list);
+ INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
+ INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
+ init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
+ lad->lad_ops = lao;
+ lad->lad_name = name;
+ }
+
+ return lad;
+}
+
+struct lfsck_assistant_object *
+lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
+ const struct lu_attr *attr, __u64 cookie,
+ bool is_dir)
+{
+ struct lfsck_assistant_object *lso;
+
+ OBD_ALLOC_PTR(lso);
+ if (lso == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ lso->lso_fid = *fid;
+ if (attr != NULL)
+ lso->lso_attr = *attr;
+
+ atomic_set(&lso->lso_ref, 1);
+ lso->lso_oit_cookie = cookie;
+ if (is_dir)
+ lso->lso_is_dir = 1;
+
+ return lso;
+}
+
+struct dt_object *
+lfsck_assistant_object_load(const struct lu_env *env,
+ struct lfsck_instance *lfsck,
+ struct lfsck_assistant_object *lso)
+{
+ struct dt_object *obj;
+
+ obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
+ if (IS_ERR(obj))
+ return obj;
+
+ if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
+ lso->lso_dead = 1;
+ lfsck_object_put(env, obj);
+
+ return ERR_PTR(-ENOENT);
+ }
+
+ if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj))) {
+ lfsck_object_put(env, obj);
+
+ return ERR_PTR(-ENOTDIR);
+ }
+
+ return obj;
+}
+
+/**
+ * Generic LFSCK asynchronous communication interpretor function.
+ * The LFSCK RPC reply for both the event notification and status
+ * querying will be handled here.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] req pointer to the LFSCK request
+ * \param[in] args pointer to the lfsck_async_interpret_args
+ * \param[in] rc the result for handling the LFSCK request
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+int lfsck_async_interpret_common(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *args, int rc)
+{
+ struct lfsck_async_interpret_args *laia = args;
+ struct lfsck_component *com = laia->laia_com;
+ struct lfsck_assistant_data *lad = com->lc_data;
+ struct lfsck_tgt_descs *ltds = laia->laia_ltds;
+ struct lfsck_tgt_desc *ltd = laia->laia_ltd;
+ struct lfsck_request *lr = laia->laia_lr;
+
+ LASSERT(com->lc_lfsck->li_master);
+
+ switch (lr->lr_event) {
+ case LE_START:
+ if (rc != 0) {
+ CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
+ "start: rc = %d\n",
+ lfsck_lfsck2name(com->lc_lfsck),
+ (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
+ ltd->ltd_index, lad->lad_name, rc);
+
+ if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+ struct lfsck_layout *lo = com->lc_file_ram;
+
+ if (lr->lr_flags & LEF_TO_OST)
+ lfsck_lad_set_bitmap(env, com,
+ ltd->ltd_index);
+ else
+ lo->ll_flags |= LF_INCOMPLETE;
+ } else {
+ struct lfsck_namespace *ns = com->lc_file_ram;
+
+ /* If some MDT does not join the namespace
+ * LFSCK, then we cannot know whether there
+ * is some name entry on such MDT that with
+ * the referenced MDT-object on this MDT or
+ * not. So the namespace LFSCK on this MDT
+ * cannot handle orphan MDT-objects properly.
+ * So we mark the LFSCK as LF_INCOMPLETE and
+ * skip orphan MDT-objects handling. */
+ ns->ln_flags |= LF_INCOMPLETE;
+ }
+ break;
+ }