Whamcloud - gitweb
LU-5180 lfsck: linkea for orphan 33/10733/15
authorFan Yong <fan.yong@intel.com>
Fri, 30 May 2014 03:59:19 +0000 (11:59 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 9 Jul 2014 14:17:24 +0000 (14:17 +0000)
To make the fid2path tools can work against any client visible
object, the LFSCK will generate linkEA not only for the orphan
under the directory /ROOT/.lustre/lost+found/MDTxxxx/ but also
for the objects ".lustre" and ".lustre/lost+found".

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ibf957e8e2ec93d0f8f8a7cf1ac45706fb79a41db
Reviewed-on: http://review.whamcloud.com/10733
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c

index 1deb99c..e3b6ea3 100644 (file)
@@ -45,6 +45,7 @@
 #include <lustre_dlm.h>
 #include <lustre_fid.h>
 #include <md_object.h>
+#include <lustre_linkea.h>
 
 #define HALF_SEC                       (HZ >> 1)
 #define LFSCK_CHECKPOINT_INTERVAL      60
@@ -637,6 +638,9 @@ int lfsck_set_param(const struct lu_env *env, struct lfsck_instance *lfsck,
                    struct lfsck_start *start, bool reset);
 
 /* lfsck_namespace.c */
+int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
+                       struct dt_object *obj, const struct lu_name *cname,
+                       const struct lu_fid *pfid);
 int lfsck_namespace_setup(const struct lu_env *env,
                          struct lfsck_instance *lfsck);
 
index 2345dc5..f59dca7 100644 (file)
@@ -39,7 +39,6 @@
 #include <lustre/lustre_idl.h>
 #include <lu_object.h>
 #include <dt_object.h>
-#include <lustre_linkea.h>
 #include <lustre_fid.h>
 #include <lustre_lib.h>
 #include <lustre_net.h>
@@ -1993,6 +1992,9 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        struct lu_buf                   *pbuf   = NULL;
        struct lu_buf                   *ea_buf = &info->lti_big_buf;
        struct lustre_handle             lh     = { 0 };
+       struct linkea_data               ldata  = { 0 };
+       struct lu_buf                    linkea_buf;
+       const struct lu_name            *pname;
        int                              buflen = ea_buf->lb_len;
        int                              idx    = 0;
        int                              rc     = 0;
@@ -2040,6 +2042,16 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                        GOTO(put, rc);
        } while (rc == 0);
 
+       rc = linkea_data_new(&ldata,
+                            &lfsck_env_info(env)->lti_linkea_buf);
+       if (rc != 0)
+               GOTO(put, rc);
+
+       pname = lfsck_name_get_const(env, name, strlen(name));
+       rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
+       if (rc != 0)
+               GOTO(put, rc);
+
        memset(la, 0, sizeof(*la));
        la->la_uid = rec->lor_uid;
        la->la_gid = rec->lor_gid;
@@ -2102,6 +2114,14 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
+       /* 5a. insert linkEA for parent. */
+       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
        rc = dt_trans_start(env, next, th);
        if (rc != 0)
                GOTO(stop, rc);
@@ -2130,6 +2150,12 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        rc = dt_insert(env, lfsck->li_lpf_obj,
                       (const struct dt_rec *)pfid,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       /* 5b. insert linkEA for parent. */
+       rc = dt_xattr_set(env, pobj, &linkea_buf,
+                         XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
 
        GOTO(stop, rc);
 
index 03986d2..247a608 100644 (file)
@@ -353,6 +353,8 @@ int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
 
 static const char dot[] = ".";
 static const char dotdot[] = "..";
+static const char dotlustre[] = ".lustre";
+static const char lostfound[] = "lost+found";
 
 static int lfsck_create_lpf_local(const struct lu_env *env,
                                  struct lfsck_instance *lfsck,
@@ -365,13 +367,26 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        struct dt_device        *dev    = lfsck->li_bottom;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
-       const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
+       const struct lu_fid     *cfid   = lfsck_dto2fid(child);
        struct thandle          *th     = NULL;
+       struct linkea_data       ldata  = { 0 };
+       struct lu_buf            linkea_buf;
+       const struct lu_name    *cname;
        loff_t                   pos    = 0;
        int                      len    = sizeof(struct lfsck_bookmark);
-       int                      rc     = 0;
+       int                      rc;
        ENTRY;
 
+       rc = linkea_data_new(&ldata,
+                            &lfsck_env_info(env)->lti_linkea_buf);
+       if (rc != 0)
+               RETURN(rc);
+
+       cname = lfsck_name_get_const(env, name, strlen(name));
+       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+       if (rc != 0)
+               RETURN(rc);
+
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
@@ -386,18 +401,26 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 3a. insert name into parent dir */
+       /* 3a. insert linkEA for child */
+       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       rc = dt_declare_xattr_set(env, child, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       /* 4a. insert name into parent dir */
        rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 4a. increase parent nlink */
+       /* 5a. increase parent nlink */
        rc = dt_declare_ref_add(env, parent, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 5a. update bookmark */
+       /* 6a. update bookmark */
        rc = dt_declare_record_write(env, bk_obj,
                                     lfsck_buf_get(env, bk, len), 0, th);
        if (rc != 0)
@@ -408,7 +431,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
                GOTO(stop, rc);
 
        dt_write_lock(env, child, 0);
-       /* 1b.1 create child */
+       /* 1b.1. create child */
        rc = dt_create(env, child, la, NULL, dof, th);
        if (rc != 0)
                GOTO(unlock, rc);
@@ -416,13 +439,13 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        if (unlikely(!dt_try_as_dir(env, child)))
                GOTO(unlock, rc = -ENOTDIR);
 
-       /* 1b.2 insert dot into child dir */
+       /* 1b.2. insert dot into child dir */
        rc = dt_insert(env, child, (const struct dt_rec *)cfid,
                       (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
-       /* 1b.3 insert dotdot into child dir */
+       /* 1b.3. insert dotdot into child dir */
        rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
                       (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
        if (rc != 0)
@@ -430,18 +453,24 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
 
        /* 2b. increase child nlink */
        rc = dt_ref_add(env, child, th);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       /* 3b. insert linkEA for child. */
+       rc = dt_xattr_set(env, child, &linkea_buf,
+                         XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
        dt_write_unlock(env, child);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 3b. insert name into parent dir */
+       /* 4b. insert name into parent dir */
        rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(stop, rc);
 
        dt_write_lock(env, parent, 0);
-       /* 4b. increase parent nlink */
+       /* 5b. increase parent nlink */
        rc = dt_ref_add(env, parent, th);
        dt_write_unlock(env, parent);
        if (rc != 0)
@@ -450,7 +479,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        bk->lb_lpf_fid = *cfid;
        lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
 
-       /* 5b. update bookmark */
+       /* 6b. update bookmark */
        rc = dt_record_write(env, bk_obj,
                             lfsck_buf_get(env, bk, len), &pos, th);
 
@@ -475,14 +504,27 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
 {
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
-       const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
+       const struct lu_fid     *cfid   = lfsck_dto2fid(child);
        struct thandle          *th     = NULL;
+       struct linkea_data       ldata  = { 0 };
+       struct lu_buf            linkea_buf;
+       const struct lu_name    *cname;
        struct dt_device        *dev;
        loff_t                   pos    = 0;
        int                      len    = sizeof(struct lfsck_bookmark);
-       int                      rc     = 0;
+       int                      rc;
        ENTRY;
 
+       rc = linkea_data_new(&ldata,
+                            &lfsck_env_info(env)->lti_linkea_buf);
+       if (rc != 0)
+               RETURN(rc);
+
+       cname = lfsck_name_get_const(env, name, strlen(name));
+       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+       if (rc != 0)
+               RETURN(rc);
+
        /* Create .lustre/lost+found/MDTxxxx. */
 
        /* XXX: Currently, cross-MDT create operation needs to create the child
@@ -495,37 +537,51 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
         *      To avoid more inconsistency, we split the create operation into
         *      two transactions:
         *
-        *      1) create the child locally.
+        *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
+        *         locally.
         *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
-        *         remotely and update the lfsck_bookmark::lb_lpf_fid locally.
+        *         remotely.
         *
-        *      If 1) done but 2) failed, then the worst case is that we lose
-        *      one object locally, which is not a big issue. (can be repaird
-        *      by LFSCK phase III) */
+        *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
+        *      repair such inconsistency when LFSCK run next time. */
 
-       /* Transaction I: */
+       /* Transaction I: locally */
 
        dev = lfsck->li_bottom;
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       /* 1a. create child locally. */
+       /* 1a. create child */
        rc = dt_declare_create(env, child, la, NULL, dof, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 2a. increase child nlink locally. */
+       /* 2a. increase child nlink */
        rc = dt_declare_ref_add(env, child, th);
        if (rc != 0)
                GOTO(stop, rc);
 
+       /* 3a. insert linkEA for child */
+       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       rc = dt_declare_xattr_set(env, child, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       /* 4a. update bookmark */
+       rc = dt_declare_record_write(env, bk_obj,
+                                    lfsck_buf_get(env, bk, len), 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
        rc = dt_trans_start_local(env, dev, th);
        if (rc != 0)
                GOTO(stop, rc);
 
        dt_write_lock(env, child, 0);
-       /* 1b. create child locally. */
+       /* 1b.1. create child */
        rc = dt_create(env, child, la, NULL, dof, th);
        if (rc != 0)
                GOTO(unlock, rc);
@@ -533,95 +589,73 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
        if (unlikely(!dt_try_as_dir(env, child)))
                GOTO(unlock, rc = -ENOTDIR);
 
-       /* 2b.1 insert dot into child dir locally. */
+       /* 1b.2. insert dot into child dir */
        rc = dt_insert(env, child, (const struct dt_rec *)cfid,
                       (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
-       /* 2b.2 insert dotdot into child dir locally. */
+       /* 1b.3. insert dotdot into child dir */
        rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
                       (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
-       /* 2b.3 increase child nlink locally. */
+       /* 2b. increase child nlink */
        rc = dt_ref_add(env, child, th);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       /* 3b. insert linkEA for child */
+       rc = dt_xattr_set(env, child, &linkea_buf,
+                         XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       bk->lb_lpf_fid = *cfid;
+       lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
+
+       /* 4b. update bookmark */
+       rc = dt_record_write(env, bk_obj,
+                            lfsck_buf_get(env, bk, len), &pos, th);
+
        dt_write_unlock(env, child);
        dt_trans_stop(env, dev, th);
        if (rc != 0)
                RETURN(rc);
 
-       /* Transaction II: */
+       /* Transaction II: remotely */
 
        dev = lfsck->li_next;
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       /* 3a. insert name into parent dir remotely. */
+       /* 5a. insert name into parent dir */
        rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 4a. increase parent nlink remotely. */
+       /* 6a. increase parent nlink */
        rc = dt_declare_ref_add(env, parent, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 5a. decrease child nlink for dotdot locally if former remote
-        *     update failed. */
-       rc = dt_declare_ref_del(env, child, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       /* 6a. decrease child nlink for dot locally if former remote
-        *     update failed. */
-       rc = dt_declare_ref_del(env, child, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       /* 7a. destroy child locally if former remote update failed. */
-       rc = dt_declare_destroy(env, child, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       /* 8a. update bookmark locally. */
-       rc = dt_declare_record_write(env, bk_obj,
-                                    lfsck_buf_get(env, bk, len), 0, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
        rc = dt_trans_start(env, dev, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 3b. insert name into parent dir remotely. */
+       /* 5b. insert name into parent dir */
        rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
-       if (rc == 0) {
-               dt_write_lock(env, parent, 0);
-               /* 4b. increase parent nlink remotely. */
-               rc = dt_ref_add(env, parent, th);
-               dt_write_unlock(env, parent);
-       }
-       if (rc != 0) {
-               /* 5b. decrease child nlink for dotdot locally. */
-               dt_ref_del(env, child, th);
-               /* 6b. decrease child nlink for dot locally. */
-               dt_ref_del(env, child, th);
-               /* 7b. destroy child locally. */
-               dt_destroy(env, child, th);
+       if (rc != 0)
                GOTO(stop, rc);
-       }
-
-       bk->lb_lpf_fid = *cfid;
-       lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
 
-       /* 8b. update bookmark locally. */
-       rc = dt_record_write(env, bk_obj,
-                            lfsck_buf_get(env, bk, len), &pos, th);
+       dt_write_lock(env, parent, 0);
+       /* 6b. increase parent nlink */
+       rc = dt_ref_add(env, parent, th);
+       dt_write_unlock(env, parent);
 
        GOTO(stop, rc);
 
@@ -630,6 +664,13 @@ unlock:
 stop:
        dt_trans_stop(env, dev, th);
 
+       if (rc != 0 && dev == lfsck->li_next)
+               CDEBUG(D_LFSCK, "%s: partially created the object "DFID
+                      "for orphans, but failed to insert the name %s "
+                      "to the .lustre/lost+found/. Such inconsistency "
+                      "will be repaired when LFSCK run next time: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
+
        return rc;
 }
 
@@ -705,10 +746,11 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
        if (dt_object_exists(child) != 0) {
                if (unlikely(!dt_try_as_dir(env, child)))
-                       GOTO(unlock, rc = -ENOTDIR);
+                       rc = -ENOTDIR;
+               else
+                       lfsck->li_lpf_obj = child;
 
-               lfsck->li_lpf_obj = child;
-               GOTO(unlock, rc = 0);
+               GOTO(unlock, rc);
        }
 
        memset(la, 0, sizeof(*la));
@@ -2362,7 +2404,7 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
 {
        struct lfsck_instance   *lfsck;
        struct dt_object        *root  = NULL;
-       struct dt_object        *obj;
+       struct dt_object        *obj   = NULL;
        struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
        int                      rc;
        ENTRY;
@@ -2402,7 +2444,7 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        fid->f_seq = FID_SEQ_LOCAL_NAME;
        fid->f_oid = 1;
        fid->f_ver = 0;
-       rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
+       rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
        if (rc != 0)
                GOTO(out, rc);
 
@@ -2410,7 +2452,7 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        if (rc != 0)
                GOTO(out, rc);
 
-       root = dt_locate(env, lfsck->li_bottom, fid);
+       root = dt_locate(env, key, fid);
        if (IS_ERR(root))
                GOTO(out, rc = PTR_ERR(root));
 
@@ -2420,30 +2462,72 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        lfsck->li_local_root_fid = *fid;
        if (master) {
                lfsck->li_master = 1;
-               if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
+               if (lfsck_dev_idx(key) == 0) {
+                       struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
+                       const struct lu_name *cname;
+
                        rc = dt_lookup(env, root,
                                (struct dt_rec *)(&lfsck->li_global_root_fid),
                                (const struct dt_key *)"ROOT", BYPASS_CAPA);
                        if (rc != 0)
                                GOTO(out, rc);
+
+                       obj = dt_locate(env, key, &lfsck->li_global_root_fid);
+                       if (IS_ERR(obj))
+                               GOTO(out, rc = PTR_ERR(obj));
+
+                       rc = dt_lookup(env, obj, (struct dt_rec *)fid,
+                               (const struct dt_key *)dotlustre, BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       lu_object_put(env, &obj->do_lu);
+                       obj = dt_locate(env, key, fid);
+                       if (IS_ERR(obj))
+                               GOTO(out, rc = PTR_ERR(obj));
+
+                       cname = lfsck_name_get_const(env, dotlustre,
+                                                    strlen(dotlustre));
+                       rc = lfsck_verify_linkea(env, key, obj, cname,
+                                                &lfsck->li_global_root_fid);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       *pfid = *fid;
+                       rc = dt_lookup(env, obj, (struct dt_rec *)fid,
+                                      (const struct dt_key *)lostfound,
+                                      BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       lu_object_put(env, &obj->do_lu);
+                       obj = dt_locate(env, key, fid);
+                       if (IS_ERR(obj))
+                               GOTO(out, rc = PTR_ERR(obj));
+
+                       cname = lfsck_name_get_const(env, lostfound,
+                                                    strlen(lostfound));
+                       rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       lu_object_put(env, &obj->do_lu);
+                       obj = NULL;
                }
        }
 
        fid->f_seq = FID_SEQ_LOCAL_FILE;
        fid->f_oid = OTABLE_IT_OID;
        fid->f_ver = 0;
-       obj = dt_locate(env, lfsck->li_bottom, fid);
+       obj = dt_locate(env, key, fid);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
+       lu_object_get(&obj->do_lu);
        lfsck->li_obj_oit = obj;
        rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
-       if (rc != 0) {
-               if (rc == -ENOTSUPP)
-                       GOTO(add, rc = 0);
-
+       if (rc != 0)
                GOTO(out, rc);
-       }
 
        rc = lfsck_bookmark_setup(env, lfsck);
        if (rc != 0)
@@ -2465,11 +2549,12 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
 
        /* XXX: more LFSCK components initialization to be added here. */
 
-add:
        rc = lfsck_instance_add(lfsck);
        if (rc == 0)
                rc = lfsck_add_target_from_orphan(env, lfsck);
 out:
+       if (obj != NULL && !IS_ERR(obj))
+               lu_object_put(env, &obj->do_lu);
        if (root != NULL && !IS_ERR(root))
                lu_object_put(env, &root->do_lu);
        if (rc != 0)
index 0016a85..a2c07aa 100644 (file)
@@ -34,7 +34,6 @@
 #include <lu_object.h>
 #include <dt_object.h>
 #include <md_object.h>
-#include <lustre_linkea.h>
 #include <lustre_fid.h>
 #include <lustre_lib.h>
 #include <lustre_net.h>
@@ -1607,6 +1606,86 @@ static struct lfsck_operations lfsck_namespace_ops = {
        .lfsck_query            = lfsck_namespace_query,
 };
 
+/**
+ * Verify the specified linkEA entry for the given directory object.
+ * If the object has no such linkEA entry or it has more other linkEA
+ * entries, then re-generate the linkEA with the given information.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] dev      pointer to the dt_device
+ * \param[in] obj      pointer to the dt_object to be handled
+ * \param[in] cname    the name for the child in the parent directory
+ * \param[in] pfid     the parent directory's FID for the linkEA
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
+                       struct dt_object *obj, const struct lu_name *cname,
+                       const struct lu_fid *pfid)
+{
+       struct linkea_data       ldata  = { 0 };
+       struct lu_buf            linkea_buf;
+       struct thandle          *th;
+       int                      rc;
+       int                      fl     = LU_XATTR_CREATE;
+       bool                     dirty  = false;
+       ENTRY;
+
+       LASSERT(S_ISDIR(lfsck_object_type(obj)));
+
+       rc = lfsck_links_read(env, obj, &ldata);
+       if (rc == -ENODATA) {
+               dirty = true;
+       } else if (rc == 0) {
+               fl = LU_XATTR_REPLACE;
+               if (ldata.ld_leh->leh_reccount != 1) {
+                       dirty = true;
+               } else {
+                       rc = linkea_links_find(&ldata, cname, pfid);
+                       if (rc != 0)
+                               dirty = true;
+               }
+       }
+
+       if (!dirty)
+               RETURN(rc);
+
+       rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
+       if (rc != 0)
+               RETURN(rc);
+
+       rc = linkea_add_buf(&ldata, cname, pfid);
+       if (rc != 0)
+               RETURN(rc);
+
+       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_xattr_set(env, obj, &linkea_buf,
+                                 XATTR_NAME_LINK, fl, th);
+       if (rc != 0)
+               GOTO(stop, rc = PTR_ERR(th));
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       rc = dt_xattr_set(env, obj, &linkea_buf,
+                         XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
+       dt_write_unlock(env, obj);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       return rc;
+}
+
 int lfsck_namespace_setup(const struct lu_env *env,
                          struct lfsck_instance *lfsck)
 {