+/**
+ * Create the specified orphan MDT-object on remote MDT.
+ *
+ * The LFSCK instance on this MDT will send LFSCK RPC to remote MDT to
+ * ask the remote LFSCK instance to create the specified orphan object
+ * under .lustre/lost+found/MDTxxxx/ directory with the name:
+ * ${FID}-P-${conflict_version}.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] orphan pointer to the orphan MDT-object
+ * \param[in] type the orphan's type to be created
+ *
+ * type "P": The orphan object to be created was a parent directory
+ * of some DMT-object which linkEA shows that the @orphan
+ * object is missing.
+ *
+ * \see lfsck_layout_recreate_parent() for more types.
+ *
+ * \retval positive number for repaired cases
+ * \retval 0 if needs to repair nothing
+ * \retval negative error number on failure
+ */
+static int lfsck_namespace_create_orphan_remote(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *orphan,
+ __u32 type)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lfsck_request *lr = &info->lti_lr;
+ struct lu_seq_range *range = &info->lti_range;
+ const struct lu_fid *fid = lfsck_dto2fid(orphan);
+ struct lfsck_namespace *ns = com->lc_file_ram;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct seq_server_site *ss =
+ lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
+ struct lfsck_tgt_desc *ltd = NULL;
+ struct ptlrpc_request *req = NULL;
+ int rc;
+ ENTRY;
+
+ if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+ GOTO(out, rc = 1);
+
+ fld_range_set_mdt(range);
+ rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, range->lsr_index);
+ if (ltd == NULL) {
+ ns->ln_flags |= LF_INCOMPLETE;
+
+ GOTO(out, rc = -ENODEV);
+ }
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
+ &RQF_LFSCK_NOTIFY);
+ if (req == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
+ if (rc != 0) {
+ ptlrpc_request_free(req);
+
+ GOTO(out, rc);
+ }
+
+ lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
+ memset(lr, 0, sizeof(*lr));
+ lr->lr_event = LE_CREATE_ORPHAN;
+ lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
+ lr->lr_active = LFSCK_TYPE_NAMESPACE;
+ lr->lr_fid = *fid;
+ lr->lr_type = type;
+
+ ptlrpc_request_set_replen(req);
+ rc = ptlrpc_queue_wait(req);
+ ptlrpc_req_finished(req);
+
+ if (rc == 0)
+ rc = 1;
+ else if (rc == -EEXIST)
+ rc = 0;
+
+ GOTO(out, rc);
+
+out:
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK create object "
+ DFID" on the MDT %x remotely: rc = %d\n",
+ lfsck_lfsck2name(lfsck), PFID(fid),
+ ltd != NULL ? ltd->ltd_index : -1, rc);
+
+ if (ltd != NULL)
+ lfsck_tgt_put(ltd);
+
+ return rc;
+}
+
+/**
+ * Create the specified orphan MDT-object locally.
+ *
+ * For the case that the parent MDT-object stored in some MDT-object's
+ * linkEA entry is lost, the LFSCK will re-create the parent object as
+ * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
+ * with the name ${FID}-P-${conflict_version}.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] orphan pointer to the orphan MDT-object to be created
+ * \param[in] type the orphan's type to be created
+ *
+ * type "P": The orphan object to be created was a parent directory
+ * of some DMT-object which linkEA shows that the @orphan
+ * object is missing.
+ *
+ * \see lfsck_layout_recreate_parent() for more types.
+ *
+ * \retval positive number for repaired cases
+ * \retval negative error number on failure
+ */
+static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
+ struct lfsck_component *com,
+ struct dt_object *orphan,
+ __u32 type)
+{
+ struct lfsck_thread_info *info = lfsck_env_info(env);
+ struct lu_attr *la = &info->lti_la;
+ struct dt_allocation_hint *hint = &info->lti_hint;
+ struct dt_object_format *dof = &info->lti_dof;
+ struct lu_name *cname = &info->lti_name2;
+ struct dt_insert_rec *rec = &info->lti_dt_rec;
+ struct lu_fid *tfid = &info->lti_fid;
+ const struct lu_fid *cfid = lfsck_dto2fid(orphan);
+ const struct lu_fid *pfid;
+ struct lfsck_instance *lfsck = com->lc_lfsck;
+ struct dt_device *dev = lfsck->li_bottom;
+ struct dt_object *parent = NULL;
+ struct dt_object *child = NULL;
+ struct thandle *th = NULL;
+ struct lustre_handle lh = { 0 };
+ struct linkea_data ldata = { 0 };
+ struct lu_buf linkea_buf;
+ char name[32];
+ int namelen;
+ int idx = 0;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(!dt_object_exists(orphan));
+ LASSERT(!dt_object_remote(orphan));
+
+ /* @orphan maybe not attached to lfsck->li_bottom */
+ child = lfsck_object_find_by_dev(env, dev, cfid);
+ if (IS_ERR(child))
+ GOTO(log, rc = PTR_ERR(child));
+
+ cname->ln_name = NULL;
+ if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+ GOTO(log, rc = 1);
+
+ /* Create .lustre/lost+found/MDTxxxx when needed. */
+ if (unlikely(lfsck->li_lpf_obj == NULL)) {
+ rc = lfsck_create_lpf(env, lfsck);
+ if (rc != 0)
+ GOTO(log, rc);
+ }
+
+ parent = lfsck->li_lpf_obj;
+ pfid = lfsck_dto2fid(parent);
+
+ /* Hold update lock on the parent to prevent others to access. */
+ rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
+ MDS_INODELOCK_UPDATE, LCK_EX);
+ if (rc != 0)
+ GOTO(log, rc);
+
+ do {
+ namelen = snprintf(name, 31, DFID"-P-%d",
+ PFID(cfid), idx++);
+ rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+ (const struct dt_key *)name, BYPASS_CAPA);
+ if (rc != 0 && rc != -ENOENT)
+ GOTO(unlock1, rc);
+ } while (rc == 0);
+
+ cname->ln_name = name;
+ cname->ln_namelen = namelen;
+
+ memset(la, 0, sizeof(*la));
+ la->la_mode = type | (S_ISDIR(type) ? 0700 : 0600);
+ la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
+ LA_ATIME | LA_MTIME | LA_CTIME;
+
+ child->do_ops->do_ah_init(env, hint, parent, child,
+ la->la_mode & S_IFMT);
+
+ memset(dof, 0, sizeof(*dof));
+ dof->dof_type = dt_mode_to_dft(type);
+
+ rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
+ if (rc != 0)
+ GOTO(unlock1, rc);
+
+ rc = linkea_add_buf(&ldata, cname, pfid);
+ if (rc != 0)
+ GOTO(unlock1, rc);
+
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(unlock1, rc = PTR_ERR(th));
+
+ rc = dt_declare_create(env, child, la, hint, dof, th);
+ if (rc == 0 && S_ISDIR(type))
+ rc = dt_declare_ref_add(env, child, th);
+
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+ ldata.ld_leh->leh_len);
+ rc = dt_declare_xattr_set(env, child, &linkea_buf,
+ XATTR_NAME_LINK, 0, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rec->rec_type = type;
+ rec->rec_fid = cfid;
+ rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
+ (const struct dt_key *)name, th);
+ if (rc == 0 && S_ISDIR(type))
+ rc = dt_declare_ref_add(env, parent, th);
+
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, child, 0);
+ rc = dt_create(env, child, la, hint, dof, th);
+ if (rc != 0)
+ GOTO(unlock2, rc);
+
+ if (S_ISDIR(type)) {
+ if (unlikely(!dt_try_as_dir(env, child)))
+ GOTO(unlock2, rc = -ENOTDIR);
+
+ rec->rec_type = S_IFDIR;
+ rec->rec_fid = cfid;
+ rc = dt_insert(env, child, (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
+ if (rc != 0)
+ GOTO(unlock2, rc);
+
+ rec->rec_fid = pfid;
+ rc = dt_insert(env, child, (const struct dt_rec *)rec,
+ (const struct dt_key *)dotdot, th,
+ BYPASS_CAPA, 1);
+ if (rc != 0)
+ GOTO(unlock2, rc);
+
+ rc = dt_ref_add(env, child, th);
+ if (rc != 0)
+ GOTO(unlock2, rc);
+ }
+
+ rc = dt_xattr_set(env, child, &linkea_buf,
+ XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
+ dt_write_unlock(env, child);
+ if (rc != 0)
+ GOTO(stop, rc);
+
+ rec->rec_type = type;
+ rec->rec_fid = cfid;
+ rc = dt_insert(env, parent, (const struct dt_rec *)rec,
+ (const struct dt_key *)name, th, BYPASS_CAPA, 1);
+ if (rc == 0 && S_ISDIR(type)) {
+ dt_write_lock(env, parent, 0);
+ rc = dt_ref_add(env, parent, th);
+ dt_write_unlock(env, parent);
+ }
+
+ GOTO(stop, rc = (rc == 0 ? 1 : rc));
+
+unlock2:
+ dt_write_unlock(env, child);
+
+stop:
+ dt_trans_stop(env, dev, th);
+
+unlock1:
+ lfsck_ibits_unlock(&lh, LCK_EX);
+
+log:
+ CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan locally for "
+ "the object "DFID", name = %s, type %o: rc = %d\n",
+ lfsck_lfsck2name(lfsck), PFID(cfid),
+ cname->ln_name != NULL ? cname->ln_name : "<NULL>", type, rc);
+
+ if (child != NULL && !IS_ERR(child))
+ lfsck_object_put(env, child);
+
+ return rc;
+}
+
+/**
+ * Create the specified orphan MDT-object.
+ *
+ * For the case that the parent MDT-object stored in some MDT-object's
+ * linkEA entry is lost, the LFSCK will re-create the parent object as
+ * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
+ * with the name: ${FID}-P-${conflict_version}.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] com pointer to the lfsck component
+ * \param[in] orphan pointer to the orphan MDT-object
+ *
+ * type "P": The orphan object to be created was a parent directory
+ * of some DMT-object which linkEA shows that the @orphan
+ * object is missing.
+ *
+ * \see lfsck_layout_recreate_parent() for more types.
+ *
+ * \retval positive number for repaired cases
+ * \retval 0 if needs to repair nothing
+ * \retval negative error number on failure
+ */