Whamcloud - gitweb
LU-5767 lfsck: use OUT RPC to create remote orphan parent
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
index 28bc0a2..1603099 100644 (file)
@@ -460,15 +460,84 @@ const char dotdot[] = "..";
 static const char dotlustre[] = ".lustre";
 static const char lostfound[] = "lost+found";
 
+/**
+ * Remove the name entry from the .lustre/lost+found directory.
+ *
+ * No need to care about the object referenced by the name entry,
+ * either the name entry is invalid or redundant, or the referenced
+ * object has been processed or will be handled by others.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ * \param[in] name     the name for the name entry to be removed
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
+                                      struct lfsck_instance *lfsck,
+                                      const char *name)
+{
+       struct dt_object        *parent = lfsck->li_lpf_root_obj;
+       struct dt_device        *dev    = lfsck->li_next;
+       struct thandle          *th;
+       struct lustre_handle     lh     = { 0 };
+       int                      rc;
+       ENTRY;
+
+       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
+                             MDS_INODELOCK_UPDATE, LCK_EX);
+       if (rc != 0)
+               RETURN(rc);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(unlock, rc = PTR_ERR(th));
+
+       rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_declare_ref_del(env, parent, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_delete(env, parent, (const struct dt_key *)name, th,
+                      BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, parent, 0);
+       rc = dt_ref_del(env, parent, th);
+       dt_write_unlock(env, parent);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+unlock:
+       lfsck_ibits_unlock(&lh, LCK_EX);
+
+       CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
+
+       return rc;
+}
+
 static int lfsck_create_lpf_local(const struct lu_env *env,
                                  struct lfsck_instance *lfsck,
-                                 struct dt_object *parent,
                                  struct dt_object *child,
                                  struct lu_attr *la,
                                  struct dt_object_format *dof,
                                  const char *name)
 {
        struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
+       struct dt_object        *parent = lfsck->li_lpf_root_obj;
        struct dt_device        *dev    = lfsck->li_bottom;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
@@ -606,13 +675,13 @@ stop:
 
 static int lfsck_create_lpf_remote(const struct lu_env *env,
                                   struct lfsck_instance *lfsck,
-                                  struct dt_object *parent,
                                   struct dt_object *child,
                                   struct lu_attr *la,
                                   struct dt_object_format *dof,
                                   const char *name)
 {
        struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
+       struct dt_object        *parent = lfsck->li_lpf_root_obj;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
        const struct lu_fid     *cfid   = lfsck_dto2fid(child);
@@ -789,17 +858,29 @@ stop:
        return rc;
 }
 
-/* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
- * because the MDT0 maybe not reaady for sequence allocation yet. We do that
- * only when it is required, such as orphan OST-objects repairing. */
-int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
+/**
+ * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
+ *
+ * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
+ * orphans and other uncertain inconsistent objects found during the
+ * LFSCK. Such directory will be created by the LFSCK engine on the
+ * local MDT before the LFSCK scanning.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+static int lfsck_create_lpf(const struct lu_env *env,
+                           struct lfsck_instance *lfsck)
 {
        struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
        struct lfsck_thread_info *info  = lfsck_env_info(env);
        struct lu_fid            *cfid  = &info->lti_fid2;
        struct lu_attr           *la    = &info->lti_la;
        struct dt_object_format  *dof   = &info->lti_dof;
-       struct dt_object         *parent = NULL;
+       struct dt_object         *parent = lfsck->li_lpf_root_obj;
        struct dt_object         *child = NULL;
        struct lustre_handle      lh    = { 0 };
        char                      name[8];
@@ -808,40 +889,15 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        ENTRY;
 
        LASSERT(lfsck->li_master);
-
-       sprintf(name, "MDT%04x", node);
-       if (node == 0) {
-               parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
-                                                 &LU_LPF_FID);
-       } else {
-               struct lfsck_tgt_desc *ltd;
-
-               ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
-               if (unlikely(ltd == NULL))
-                       RETURN(-ENXIO);
-
-               parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
-                                                 &LU_LPF_FID);
-               lfsck_tgt_put(ltd);
-       }
-       if (IS_ERR(parent))
-               RETURN(PTR_ERR(parent));
-
-       if (lfsck->li_lpf_obj != NULL)
-               GOTO(out, rc = 0);
-
-       if (unlikely(!dt_try_as_dir(env, parent)))
-               GOTO(out, rc = -ENOTDIR);
+       LASSERT(parent != NULL);
+       LASSERT(lfsck->li_lpf_obj == NULL);
 
        rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
                              MDS_INODELOCK_UPDATE, LCK_EX);
        if (rc != 0)
-               GOTO(out, rc);
-
-       mutex_lock(&lfsck->li_mutex);
-       if (lfsck->li_lpf_obj != NULL)
-               GOTO(unlock, rc = 0);
+               RETURN(rc);
 
+       snprintf(name, 8, "MDT%04x", node);
        if (fid_is_zero(&bk->lb_lpf_fid)) {
                /* There is corner case that: in former LFSCK scanning we have
                 * created the .lustre/lost+found/MDTxxxx but failed to update
@@ -886,24 +942,18 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        dof->dof_type = dt_mode_to_dft(S_IFDIR);
 
        if (node == 0)
-               rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
-                                           dof, name);
+               rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
        else
-               rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
-                                            dof, name);
+               rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
        if (rc == 0)
                lfsck->li_lpf_obj = child;
 
        GOTO(unlock, rc);
 
 unlock:
-       mutex_unlock(&lfsck->li_mutex);
        lfsck_ibits_unlock(&lh, LCK_EX);
        if (rc != 0 && child != NULL && !IS_ERR(child))
                lu_object_put(env, &child->do_lu);
-out:
-       if (parent != NULL && !IS_ERR(parent))
-               lu_object_put(env, &parent->do_lu);
 
        return rc;
 }
@@ -917,15 +967,14 @@ out:
  *
  * \param[in] env      pointer to the thread context
  * \param[in] lfsck    pointer to the lfsck instance
- * \param[in] parent   pointer to the lost+found object
  *
  * \retval             0 for success
  * \retval             negative error number on failure
  */
 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
-                                     struct lfsck_instance *lfsck,
-                                     struct dt_object *parent)
+                                     struct lfsck_instance *lfsck)
 {
+       struct dt_object        *parent = lfsck->li_lpf_root_obj;
        struct lu_dirent        *ent    =
                        (struct lu_dirent *)lfsck_env_info(env)->lti_key;
        const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
@@ -967,8 +1016,8 @@ static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
                if (off != 7) {
 
 remove:
-                       rc = lfsck_remove_name_entry(env, lfsck, parent,
-                                                    ent->lde_name, S_IFDIR);
+                       rc = lfsck_lpf_remove_name_entry(env, lfsck,
+                                                        ent->lde_name);
                        if (rc != 0)
                                break;
                }
@@ -1035,7 +1084,6 @@ static int lfsck_update_lpf_entry(const struct lu_env *env,
  *
  * \param[in] env      pointer to the thread context
  * \param[in] lfsck    pointer to the lfsck instance
- * \param[in] parent   pointer to the lost+found object
  * \param[in] child    pointer to the lost+found sub-directory object
  * \param[in] name     the name for lost+found sub-directory object
  * \param[out] fid     pointer to the buffer to hold the FID of the object
@@ -1050,11 +1098,11 @@ static int lfsck_update_lpf_entry(const struct lu_env *env,
  */
 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
                                  struct lfsck_instance *lfsck,
-                                 struct dt_object *parent,
                                  struct dt_object *child, const char *name,
                                  struct lu_fid *fid,
                                  enum lfsck_verify_lpf_types type)
 {
+       struct dt_object         *parent  = lfsck->li_lpf_root_obj;
        struct lfsck_thread_info *info    = lfsck_env_info(env);
        char                     *name2   = info->lti_key;
        struct lu_fid            *fid2    = &info->lti_fid3;
@@ -1179,7 +1227,7 @@ linkea:
                       lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
        } else /* if (type == LVLT_BY_NAMEENTRY) */ {
                /* The name entry is wrong, remove it. */
-               rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
+               rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
        }
 
        GOTO(out_put, rc);
@@ -1213,7 +1261,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        struct lu_fid            *pfid   = &info->lti_fid;
        struct lu_fid            *cfid   = &info->lti_fid2;
        struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
-       struct dt_object         *parent = NULL;
+       struct dt_object         *parent;
        /* child1's FID is in the bookmark file. */
        struct dt_object         *child1 = NULL;
        /* child2's FID is in the name entry MDTxxxx. */
@@ -1227,6 +1275,9 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
        LASSERT(lfsck->li_master);
 
+       if (lfsck->li_lpf_root_obj != NULL)
+               RETURN(0);
+
        if (node == 0) {
                parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
        } else {
@@ -1246,11 +1297,15 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
        LASSERT(dt_object_exists(parent));
 
-       if (unlikely(!dt_try_as_dir(env, parent)))
+       if (unlikely(!dt_try_as_dir(env, parent))) {
+               lu_object_put(env, &parent->do_lu);
+
                GOTO(put, rc = -ENOTDIR);
+       }
 
+       lfsck->li_lpf_root_obj = parent;
        if (node == 0) {
-               rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
+               rc = lfsck_scan_lpf_bad_entries(env, lfsck);
                if (rc != 0)
                        CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
                               "for bad sub-directories: rc = %d\n",
@@ -1316,7 +1371,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
        /* Invalid FID in the name entry, remove the name entry. */
        if (!fid_is_norm(cfid)) {
-               rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
+               rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
                if (rc != 0)
                        GOTO(put, rc);
 
@@ -1330,8 +1385,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        if (unlikely(!dt_object_exists(child2) ||
                     dt_object_remote(child2)) ||
                     !S_ISDIR(lfsck_object_type(child2))) {
-               rc = lfsck_remove_name_entry(env, lfsck, parent, name,
-                                            S_IFDIR);
+               rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
                if (rc != 0)
                        GOTO(put, rc);
 
@@ -1342,13 +1396,13 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
                GOTO(put, rc = -ENOTDIR);
 
        if (child1 == NULL) {
-               rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
+               rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
                                            pfid, LVLT_BY_NAMEENTRY);
        } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
-               rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
+               rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
                                            pfid, LVLT_BY_BOOKMARK);
                if (!lu_fid_eq(pfid, &LU_LPF_FID))
-                       rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
+                       rc = lfsck_verify_lpf_pairs(env, lfsck, child2,
                                                    name, pfid,
                                                    LVLT_BY_NAMEENTRY);
        } else {
@@ -1365,22 +1419,26 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
 check_child1:
        if (child1 != NULL)
-               rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
+               rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
                                            pfid, LVLT_BY_BOOKMARK);
 
        GOTO(put, rc);
 
 put:
-       if (lfsck->li_lpf_obj != NULL &&
-           unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
-               rc = -ENOTDIR;
+       if (lfsck->li_lpf_obj != NULL) {
+               if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
+                       lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
+                       lfsck->li_lpf_obj = NULL;
+                       rc = -ENOTDIR;
+               }
+       } else if (rc == 0) {
+               rc = lfsck_create_lpf(env, lfsck);
+       }
 
        if (child2 != NULL && !IS_ERR(child2))
                lu_object_put(env, &child2->do_lu);
        if (child1 != NULL && !IS_ERR(child1))
                lu_object_put(env, &child1->do_lu);
-       if (parent != NULL && !IS_ERR(parent))
-               lu_object_put(env, &parent->do_lu);
 
        return rc;
 }
@@ -1493,6 +1551,11 @@ void lfsck_instance_cleanup(const struct lu_env *env,
                lfsck->li_lpf_obj = NULL;
        }
 
+       if (lfsck->li_lpf_root_obj != NULL) {
+               lu_object_put(env, &lfsck->li_lpf_root_obj->do_lu);
+               lfsck->li_lpf_root_obj = NULL;
+       }
+
        if (lfsck->li_los != NULL) {
                local_oid_storage_fini(env, lfsck->li_los);
                lfsck->li_los = NULL;
@@ -2941,7 +3004,6 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
        case LE_FID_ACCESSED:
        case LE_PEER_EXIT:
        case LE_CONDITIONAL_DESTROY:
-       case LE_CREATE_ORPHAN:
        case LE_SKIP_NLINK_DECLARE:
        case LE_SKIP_NLINK:
        case LE_SET_LMV_MASTER: