Whamcloud - gitweb
LU-5180 lfsck: linkea for orphan
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
index 03986d2..247a608 100644 (file)
@@ -353,6 +353,8 @@ int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
 
 static const char dot[] = ".";
 static const char dotdot[] = "..";
+static const char dotlustre[] = ".lustre";
+static const char lostfound[] = "lost+found";
 
 static int lfsck_create_lpf_local(const struct lu_env *env,
                                  struct lfsck_instance *lfsck,
@@ -365,13 +367,26 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        struct dt_device        *dev    = lfsck->li_bottom;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
-       const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
+       const struct lu_fid     *cfid   = lfsck_dto2fid(child);
        struct thandle          *th     = NULL;
+       struct linkea_data       ldata  = { 0 };
+       struct lu_buf            linkea_buf;
+       const struct lu_name    *cname;
        loff_t                   pos    = 0;
        int                      len    = sizeof(struct lfsck_bookmark);
-       int                      rc     = 0;
+       int                      rc;
        ENTRY;
 
+       rc = linkea_data_new(&ldata,
+                            &lfsck_env_info(env)->lti_linkea_buf);
+       if (rc != 0)
+               RETURN(rc);
+
+       cname = lfsck_name_get_const(env, name, strlen(name));
+       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+       if (rc != 0)
+               RETURN(rc);
+
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
@@ -386,18 +401,26 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 3a. insert name into parent dir */
+       /* 3a. insert linkEA for child */
+       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       rc = dt_declare_xattr_set(env, child, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       /* 4a. insert name into parent dir */
        rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 4a. increase parent nlink */
+       /* 5a. increase parent nlink */
        rc = dt_declare_ref_add(env, parent, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 5a. update bookmark */
+       /* 6a. update bookmark */
        rc = dt_declare_record_write(env, bk_obj,
                                     lfsck_buf_get(env, bk, len), 0, th);
        if (rc != 0)
@@ -408,7 +431,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
                GOTO(stop, rc);
 
        dt_write_lock(env, child, 0);
-       /* 1b.1 create child */
+       /* 1b.1. create child */
        rc = dt_create(env, child, la, NULL, dof, th);
        if (rc != 0)
                GOTO(unlock, rc);
@@ -416,13 +439,13 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        if (unlikely(!dt_try_as_dir(env, child)))
                GOTO(unlock, rc = -ENOTDIR);
 
-       /* 1b.2 insert dot into child dir */
+       /* 1b.2. insert dot into child dir */
        rc = dt_insert(env, child, (const struct dt_rec *)cfid,
                       (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
-       /* 1b.3 insert dotdot into child dir */
+       /* 1b.3. insert dotdot into child dir */
        rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
                       (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
        if (rc != 0)
@@ -430,18 +453,24 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
 
        /* 2b. increase child nlink */
        rc = dt_ref_add(env, child, th);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       /* 3b. insert linkEA for child. */
+       rc = dt_xattr_set(env, child, &linkea_buf,
+                         XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
        dt_write_unlock(env, child);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 3b. insert name into parent dir */
+       /* 4b. insert name into parent dir */
        rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(stop, rc);
 
        dt_write_lock(env, parent, 0);
-       /* 4b. increase parent nlink */
+       /* 5b. increase parent nlink */
        rc = dt_ref_add(env, parent, th);
        dt_write_unlock(env, parent);
        if (rc != 0)
@@ -450,7 +479,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        bk->lb_lpf_fid = *cfid;
        lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
 
-       /* 5b. update bookmark */
+       /* 6b. update bookmark */
        rc = dt_record_write(env, bk_obj,
                             lfsck_buf_get(env, bk, len), &pos, th);
 
@@ -475,14 +504,27 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
 {
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
-       const struct lu_fid     *cfid   = lu_object_fid(&child->do_lu);
+       const struct lu_fid     *cfid   = lfsck_dto2fid(child);
        struct thandle          *th     = NULL;
+       struct linkea_data       ldata  = { 0 };
+       struct lu_buf            linkea_buf;
+       const struct lu_name    *cname;
        struct dt_device        *dev;
        loff_t                   pos    = 0;
        int                      len    = sizeof(struct lfsck_bookmark);
-       int                      rc     = 0;
+       int                      rc;
        ENTRY;
 
+       rc = linkea_data_new(&ldata,
+                            &lfsck_env_info(env)->lti_linkea_buf);
+       if (rc != 0)
+               RETURN(rc);
+
+       cname = lfsck_name_get_const(env, name, strlen(name));
+       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+       if (rc != 0)
+               RETURN(rc);
+
        /* Create .lustre/lost+found/MDTxxxx. */
 
        /* XXX: Currently, cross-MDT create operation needs to create the child
@@ -495,37 +537,51 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
         *      To avoid more inconsistency, we split the create operation into
         *      two transactions:
         *
-        *      1) create the child locally.
+        *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
+        *         locally.
         *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
-        *         remotely and update the lfsck_bookmark::lb_lpf_fid locally.
+        *         remotely.
         *
-        *      If 1) done but 2) failed, then the worst case is that we lose
-        *      one object locally, which is not a big issue. (can be repaird
-        *      by LFSCK phase III) */
+        *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
+        *      repair such inconsistency when LFSCK run next time. */
 
-       /* Transaction I: */
+       /* Transaction I: locally */
 
        dev = lfsck->li_bottom;
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       /* 1a. create child locally. */
+       /* 1a. create child */
        rc = dt_declare_create(env, child, la, NULL, dof, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 2a. increase child nlink locally. */
+       /* 2a. increase child nlink */
        rc = dt_declare_ref_add(env, child, th);
        if (rc != 0)
                GOTO(stop, rc);
 
+       /* 3a. insert linkEA for child */
+       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       rc = dt_declare_xattr_set(env, child, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       /* 4a. update bookmark */
+       rc = dt_declare_record_write(env, bk_obj,
+                                    lfsck_buf_get(env, bk, len), 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
        rc = dt_trans_start_local(env, dev, th);
        if (rc != 0)
                GOTO(stop, rc);
 
        dt_write_lock(env, child, 0);
-       /* 1b. create child locally. */
+       /* 1b.1. create child */
        rc = dt_create(env, child, la, NULL, dof, th);
        if (rc != 0)
                GOTO(unlock, rc);
@@ -533,95 +589,73 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
        if (unlikely(!dt_try_as_dir(env, child)))
                GOTO(unlock, rc = -ENOTDIR);
 
-       /* 2b.1 insert dot into child dir locally. */
+       /* 1b.2. insert dot into child dir */
        rc = dt_insert(env, child, (const struct dt_rec *)cfid,
                       (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
-       /* 2b.2 insert dotdot into child dir locally. */
+       /* 1b.3. insert dotdot into child dir */
        rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID,
                       (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
        if (rc != 0)
                GOTO(unlock, rc);
 
-       /* 2b.3 increase child nlink locally. */
+       /* 2b. increase child nlink */
        rc = dt_ref_add(env, child, th);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       /* 3b. insert linkEA for child */
+       rc = dt_xattr_set(env, child, &linkea_buf,
+                         XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       bk->lb_lpf_fid = *cfid;
+       lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
+
+       /* 4b. update bookmark */
+       rc = dt_record_write(env, bk_obj,
+                            lfsck_buf_get(env, bk, len), &pos, th);
+
        dt_write_unlock(env, child);
        dt_trans_stop(env, dev, th);
        if (rc != 0)
                RETURN(rc);
 
-       /* Transaction II: */
+       /* Transaction II: remotely */
 
        dev = lfsck->li_next;
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       /* 3a. insert name into parent dir remotely. */
+       /* 5a. insert name into parent dir */
        rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid,
                               (const struct dt_key *)name, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 4a. increase parent nlink remotely. */
+       /* 6a. increase parent nlink */
        rc = dt_declare_ref_add(env, parent, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 5a. decrease child nlink for dotdot locally if former remote
-        *     update failed. */
-       rc = dt_declare_ref_del(env, child, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       /* 6a. decrease child nlink for dot locally if former remote
-        *     update failed. */
-       rc = dt_declare_ref_del(env, child, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       /* 7a. destroy child locally if former remote update failed. */
-       rc = dt_declare_destroy(env, child, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       /* 8a. update bookmark locally. */
-       rc = dt_declare_record_write(env, bk_obj,
-                                    lfsck_buf_get(env, bk, len), 0, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
        rc = dt_trans_start(env, dev, th);
        if (rc != 0)
                GOTO(stop, rc);
 
-       /* 3b. insert name into parent dir remotely. */
+       /* 5b. insert name into parent dir */
        rc = dt_insert(env, parent, (const struct dt_rec *)cfid,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
-       if (rc == 0) {
-               dt_write_lock(env, parent, 0);
-               /* 4b. increase parent nlink remotely. */
-               rc = dt_ref_add(env, parent, th);
-               dt_write_unlock(env, parent);
-       }
-       if (rc != 0) {
-               /* 5b. decrease child nlink for dotdot locally. */
-               dt_ref_del(env, child, th);
-               /* 6b. decrease child nlink for dot locally. */
-               dt_ref_del(env, child, th);
-               /* 7b. destroy child locally. */
-               dt_destroy(env, child, th);
+       if (rc != 0)
                GOTO(stop, rc);
-       }
-
-       bk->lb_lpf_fid = *cfid;
-       lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
 
-       /* 8b. update bookmark locally. */
-       rc = dt_record_write(env, bk_obj,
-                            lfsck_buf_get(env, bk, len), &pos, th);
+       dt_write_lock(env, parent, 0);
+       /* 6b. increase parent nlink */
+       rc = dt_ref_add(env, parent, th);
+       dt_write_unlock(env, parent);
 
        GOTO(stop, rc);
 
@@ -630,6 +664,13 @@ unlock:
 stop:
        dt_trans_stop(env, dev, th);
 
+       if (rc != 0 && dev == lfsck->li_next)
+               CDEBUG(D_LFSCK, "%s: partially created the object "DFID
+                      "for orphans, but failed to insert the name %s "
+                      "to the .lustre/lost+found/. Such inconsistency "
+                      "will be repaired when LFSCK run next time: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
+
        return rc;
 }
 
@@ -705,10 +746,11 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
        if (dt_object_exists(child) != 0) {
                if (unlikely(!dt_try_as_dir(env, child)))
-                       GOTO(unlock, rc = -ENOTDIR);
+                       rc = -ENOTDIR;
+               else
+                       lfsck->li_lpf_obj = child;
 
-               lfsck->li_lpf_obj = child;
-               GOTO(unlock, rc = 0);
+               GOTO(unlock, rc);
        }
 
        memset(la, 0, sizeof(*la));
@@ -2362,7 +2404,7 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
 {
        struct lfsck_instance   *lfsck;
        struct dt_object        *root  = NULL;
-       struct dt_object        *obj;
+       struct dt_object        *obj   = NULL;
        struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
        int                      rc;
        ENTRY;
@@ -2402,7 +2444,7 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        fid->f_seq = FID_SEQ_LOCAL_NAME;
        fid->f_oid = 1;
        fid->f_ver = 0;
-       rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
+       rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
        if (rc != 0)
                GOTO(out, rc);
 
@@ -2410,7 +2452,7 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        if (rc != 0)
                GOTO(out, rc);
 
-       root = dt_locate(env, lfsck->li_bottom, fid);
+       root = dt_locate(env, key, fid);
        if (IS_ERR(root))
                GOTO(out, rc = PTR_ERR(root));
 
@@ -2420,30 +2462,72 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        lfsck->li_local_root_fid = *fid;
        if (master) {
                lfsck->li_master = 1;
-               if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
+               if (lfsck_dev_idx(key) == 0) {
+                       struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
+                       const struct lu_name *cname;
+
                        rc = dt_lookup(env, root,
                                (struct dt_rec *)(&lfsck->li_global_root_fid),
                                (const struct dt_key *)"ROOT", BYPASS_CAPA);
                        if (rc != 0)
                                GOTO(out, rc);
+
+                       obj = dt_locate(env, key, &lfsck->li_global_root_fid);
+                       if (IS_ERR(obj))
+                               GOTO(out, rc = PTR_ERR(obj));
+
+                       rc = dt_lookup(env, obj, (struct dt_rec *)fid,
+                               (const struct dt_key *)dotlustre, BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       lu_object_put(env, &obj->do_lu);
+                       obj = dt_locate(env, key, fid);
+                       if (IS_ERR(obj))
+                               GOTO(out, rc = PTR_ERR(obj));
+
+                       cname = lfsck_name_get_const(env, dotlustre,
+                                                    strlen(dotlustre));
+                       rc = lfsck_verify_linkea(env, key, obj, cname,
+                                                &lfsck->li_global_root_fid);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       *pfid = *fid;
+                       rc = dt_lookup(env, obj, (struct dt_rec *)fid,
+                                      (const struct dt_key *)lostfound,
+                                      BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       lu_object_put(env, &obj->do_lu);
+                       obj = dt_locate(env, key, fid);
+                       if (IS_ERR(obj))
+                               GOTO(out, rc = PTR_ERR(obj));
+
+                       cname = lfsck_name_get_const(env, lostfound,
+                                                    strlen(lostfound));
+                       rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       lu_object_put(env, &obj->do_lu);
+                       obj = NULL;
                }
        }
 
        fid->f_seq = FID_SEQ_LOCAL_FILE;
        fid->f_oid = OTABLE_IT_OID;
        fid->f_ver = 0;
-       obj = dt_locate(env, lfsck->li_bottom, fid);
+       obj = dt_locate(env, key, fid);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
+       lu_object_get(&obj->do_lu);
        lfsck->li_obj_oit = obj;
        rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
-       if (rc != 0) {
-               if (rc == -ENOTSUPP)
-                       GOTO(add, rc = 0);
-
+       if (rc != 0)
                GOTO(out, rc);
-       }
 
        rc = lfsck_bookmark_setup(env, lfsck);
        if (rc != 0)
@@ -2465,11 +2549,12 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
 
        /* XXX: more LFSCK components initialization to be added here. */
 
-add:
        rc = lfsck_instance_add(lfsck);
        if (rc == 0)
                rc = lfsck_add_target_from_orphan(env, lfsck);
 out:
+       if (obj != NULL && !IS_ERR(obj))
+               lu_object_put(env, &obj->do_lu);
        if (root != NULL && !IS_ERR(root))
                lu_object_put(env, &root->do_lu);
        if (rc != 0)