Whamcloud - gitweb
LU-3951 lfsck: OST-object inconsistency self detect/repair 67/7667/30
authorFan Yong <fan.yong@intel.com>
Mon, 17 Feb 2014 00:44:00 +0000 (08:44 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 2 Mar 2014 21:53:53 +0000 (21:53 +0000)
When client sends object-based RPC to the OST, the RPC service
thread on the OST needs to verify whether the given parent FID
in the client RPC matches the parent FID information stored in
the OST-object.

When the client given PFID does not match the OST local stored
PFID, then the OST will return "-EINPROGRESS" to the client for
retry later to avoid the RPC service thread to be blocked for
long time. On the other hand, there will be a dedicated thread
to talk with the LFSCK for the PFID verification. If the client
given parent FID information is incorrect, then deny the access;
otherwise, if the OST local stored PFID attribute is invalid,
then the OST local stored PFID xattr will be repaired.

Other fixes:
1) Hold update lock on the .lustre/lost+found/MDTxxxx object
   when add new name entry for handling orphan OST-object.
2) Hold dt_write_lock on the OST-object to be destroyed before
   transaction start, the same as normal ofd_object_destroy does.
3) Simplify the lfsck_layout_recreate_lovea() implementation.
4) Make sanity-lfsck test_18 to be workable under both DNE and
   non-DNE cases.
5) Other code cleanup.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Iefd26fe1782761ab16954a00aea0788c39534580
Reviewed-on: http://review.whamcloud.com/7667
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
23 files changed:
lustre/fld/fld_handler.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre/lustre_user.h
lustre/include/lustre_lfsck.h
lustre/include/obd_support.h
lustre/lclient/lcommon_cl.c
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/obdclass/dt_object.c
lustre/obdclass/obd_mount_server.c
lustre/ofd/lproc_ofd.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_io.c
lustre/ofd/ofd_objects.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/wiretest.c
lustre/target/out_handler.c
lustre/tests/sanity-lfsck.sh
lustre/utils/ll_decode_filter_fid.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 8bef3d1..fbf0369 100644 (file)
@@ -255,8 +255,8 @@ int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
                /* On server side, all entries should be in cache.
                 * If we can not find it in cache, just return error */
                CERROR("%s: Cannot find sequence "LPX64": rc = %d\n",
-                      fld->lsf_name, seq, -EIO);
-               RETURN(-EIO);
+                      fld->lsf_name, seq, -ENOENT);
+               RETURN(-ENOENT);
        } else {
                if (fld->lsf_control_exp == NULL) {
                        CERROR("%s: lookup "LPX64", but not connects to MDT0"
index 3620fa6..1700a3f 100644 (file)
@@ -3530,6 +3530,8 @@ struct lfsck_request {
        __u16           lr_async_windows;
        __u32           lr_padding_1;
        struct lu_fid   lr_fid;
+       struct lu_fid   lr_fid2;
+       struct lu_fid   lr_fid3;
        __u64           lr_padding_2;
        __u64           lr_padding_3;
 };
@@ -3544,6 +3546,25 @@ struct lfsck_reply {
 
 void lustre_swab_lfsck_reply(struct lfsck_reply *lr);
 
+enum lfsck_events {
+       LE_LASTID_REBUILDING    = 1,
+       LE_LASTID_REBUILT       = 2,
+       LE_PHASE1_DONE          = 3,
+       LE_PHASE2_DONE          = 4,
+       LE_START                = 5,
+       LE_STOP                 = 6,
+       LE_QUERY                = 7,
+       LE_FID_ACCESSED         = 8,
+       LE_PEER_EXIT            = 9,
+       LE_CONDITIONAL_DESTROY  = 10,
+       LE_PAIRS_VERIFY         = 11,
+};
+
+enum lfsck_event_flags {
+       LEF_TO_OST              = 0x00000001,
+       LEF_FROM_OST            = 0x00000002,
+};
+
 static inline void lustre_set_wire_obdo(struct obd_connect_data *ocd,
                                        struct obdo *wobdo,
                                        const struct obdo *lobdo)
index 2cc6c46..f0d9c62 100644 (file)
@@ -141,6 +141,11 @@ struct lu_fid {
        __u32 f_ver;
 };
 
+/* Currently, the filter_fid::ff_parent::f_ver is not the real parent
+ * MDT-object's FID::f_ver, instead it is the OST-object index in its
+ * parent MDT-object's layout EA. */
+#define f_stripe_idx f_ver
+
 struct filter_fid {
        struct lu_fid   ff_parent;  /* ff_parent.f_ver == file stripe number */
 };
index b23540e..0d6f666 100644 (file)
@@ -116,22 +116,11 @@ struct lfsck_start_param {
        unsigned int             lsp_index_valid:1;
 };
 
-enum lfsck_events {
-       LE_LASTID_REBUILDING    = 1,
-       LE_LASTID_REBUILT       = 2,
-       LE_PHASE1_DONE          = 3,
-       LE_PHASE2_DONE          = 4,
-       LE_START                = 5,
-       LE_STOP                 = 6,
-       LE_QUERY                = 7,
-       LE_FID_ACCESSED         = 8,
-       LE_PEER_EXIT            = 9,
-       LE_CONDITIONAL_DESTROY  = 10,
-};
-
-enum lfsck_event_flags {
-       LEF_TO_OST              = 0x00000001,
-       LEF_FROM_OST            = 0x00000002,
+/* For LE_PAIRS_VERIFY returned status */
+enum lfsck_pv_status {
+       LPVS_INIT               = 0,
+       LPVS_INCONSISTENT       = 1,
+       LPVS_INCONSISTENT_TOFIX = 2,
 };
 
 typedef int (*lfsck_out_notify)(const struct lu_env *env, void *data,
index 80be071..6d4a050 100644 (file)
@@ -510,6 +510,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_LOST_MDTOBJ     0x1616
 #define OBD_FAIL_LFSCK_NOPFID          0x1617
 #define OBD_FAIL_LFSCK_CHANGE_STRIPE   0x1618
+#define OBD_FAIL_LFSCK_INVALID_PFID    0x1619
 
 #define OBD_FAIL_LFSCK_NOTIFY_NET      0x16f0
 #define OBD_FAIL_LFSCK_QUERY_NET       0x16f1
index 641ab18..79ecc03 100644 (file)
@@ -953,6 +953,8 @@ void ccc_req_attr_set(const struct lu_env *env,
        }
        obdo_from_inode(oa, inode, valid_flags & flags);
        obdo_set_parent_fid(oa, &cl_i2info(inode)->lli_fid);
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_INVALID_PFID))
+               oa->o_parent_oid++;
 #ifdef __KERNEL__
        memcpy(attr->cra_jobid, cl_i2info(inode)->lli_jobid,
               JOBSTATS_JOBID_SIZE);
index b9cdd69..4f84c3b 100644 (file)
@@ -534,6 +534,8 @@ struct lfsck_thread_args {
        struct lfsck_start_param        *lta_lsp;
 };
 
+#define LFSCK_TMPBUF_LEN       64
+
 struct lfsck_thread_info {
        struct lu_name          lti_name;
        struct lu_buf           lti_buf;
@@ -556,6 +558,7 @@ struct lfsck_thread_info {
         * then lti_ent::lde_name will be lti_key. */
        struct lu_dirent        lti_ent;
        char                    lti_key[NAME_MAX + 16];
+       char                    lti_tmpbuf[LFSCK_TMPBUF_LEN];
        struct lfsck_request    lti_lr;
        struct lfsck_async_interpret_args lti_laia;
        struct lfsck_start      lti_start;
index d2f7ae7..d658c60 100644 (file)
@@ -1726,8 +1726,8 @@ static int lfsck_layout_extend_lovea(const struct lu_env *env,
                lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
                /* XXX: We cannot know the stripe size,
                 *      then use the default value (1 MB). */
-               lmm->lmm_stripe_size = cpu_to_le32(1024 * 1024);
-               lmm->lmm_layout_gen = cpu_to_le16(0);
+               lmm->lmm_stripe_size =
+                       cpu_to_le32(LOV_DESC_STRIPE_SIZE_DEFAULT);
                objs = &(lmm->lmm_objects[ea_off]);
        } else {
                __u16   count = le16_to_cpu(lmm->lmm_stripe_count);
@@ -1791,10 +1791,10 @@ static int lfsck_layout_update_pfid(const struct lu_env *env,
 
        pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
        pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
-       /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver,
-        * instead, it is the OST-object index in its parent MDT-object
-        * layout EA. */
-       pfid->ff_parent.f_ver = cpu_to_le32(ea_off);
+       /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
+        * MDT-object's FID::f_ver, instead it is the OST-object index in its
+        * parent MDT-object's layout EA. */
+       pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
        buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
 
        rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
@@ -1846,7 +1846,9 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        struct thandle                  *th     = NULL;
        struct lu_buf                   *pbuf   = NULL;
        struct lu_buf                   *ea_buf = &info->lti_big_buf;
+       struct lustre_handle             lh     = { 0 };
        int                              buflen = ea_buf->lb_len;
+       int                              idx    = 0;
        int                              rc     = 0;
        ENTRY;
 
@@ -1866,10 +1868,10 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
 
                ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
                ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
-               /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver,
-                * instead, it is the OST-object index in its parent MDT-object
-                * layout EA. */
-               ff->ff_parent.f_ver = cpu_to_le32(ea_off);
+               /* Currently, the filter_fid::ff_parent::f_ver is not the
+                * real parent MDT-object's FID::f_ver, instead it is the
+                * OST-object index in its parent MDT-object's layout EA. */
+               ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
                pbuf = lfsck_buf_get(env, ff, sizeof(struct filter_fid));
                cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
                if (IS_ERR(cobj))
@@ -1892,7 +1894,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
         *
         *  1. Use the MDT-object's FID as the name with prefix and postfix.
         *
-        *  1.1 prefix "C-":    More than one OST-objects cliam the same
+        *  1.1 prefix "C-":    More than one OST-objects claim the same
         *                      MDT-object and the same slot in the layout EA.
         *                      It may be created for dangling referenced MDT
         *                      object or may be not.
@@ -1902,7 +1904,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
         *  1.3 prefix "R-":    The orphan OST-object know its parent FID but
         *                      does not know the position in the namespace.
         *
-        *  2. If there is name conflict, increase FID::f_ver for new name. */
+        *  2. If there is name conflict, append more index for new name. */
        sprintf(name, "%s"DFID"%s", prefix, PFID(pfid), postfix);
        do {
                rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
@@ -1915,9 +1917,8 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                              "by the "DFID". Try to increase the FID version "
                              "for the new file name.\n",
                              lfsck_lfsck2name(lfsck), name, PFID(tfid));
-                       *tfid = *pfid;
-                       tfid->f_ver++;
-                       sprintf(name, "%s"DFID"%s", prefix, PFID(tfid), postfix);
+                       sprintf(name, "%s"DFID"%s-%d", prefix, PFID(pfid),
+                               postfix, ++idx);
                }
        } while (rc == 0);
 
@@ -1940,9 +1941,20 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                ea_buf->lb_len = rc;
        }
 
+       /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
+        *
+        * XXX: Currently, we do not grab the PDO lock as normal create cases,
+        *      because creating MDT-object for orphan OST-object is rare, we
+        *      do not much care about the performance. It can be improved in
+        *      the future when needed. */
+       rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh,
+                              MDS_INODELOCK_UPDATE);
+       if (rc != 0)
+               GOTO(put, rc);
+
        th = dt_trans_create(env, next);
        if (IS_ERR(th))
-               GOTO(put, rc = PTR_ERR(th));
+               GOTO(unlock, rc = PTR_ERR(th));
 
        /* 1a. Update OST-object's parent information remotely.
         *
@@ -2005,6 +2017,10 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
 
 stop:
        dt_trans_stop(env, next, th);
+
+unlock:
+       lfsck_layout_unlock(&lh);
+
 put:
        if (cobj != NULL && !IS_ERR(cobj))
                lu_object_put(env, &cobj->do_lu);
@@ -2120,9 +2136,18 @@ static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
        if (rc != ELDLM_OK)
                GOTO(put, rc = -EIO);
 
+       dt_write_lock(env, obj, 0);
+       /* Get obj's attr within lock again. */
+       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       if (la->la_ctime != 0)
+               GOTO(unlock, rc = -ETXTBSY);
+
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
-               GOTO(unlock1, rc = PTR_ERR(th));
+               GOTO(unlock, rc = PTR_ERR(th));
 
        rc = dt_declare_ref_del(env, obj, th);
        if (rc != 0)
@@ -2136,18 +2161,9 @@ static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
-       dt_write_lock(env, obj, 0);
-       /* Get obj's attr within lock again. */
-       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
-       if (rc != 0)
-               GOTO(unlock2, rc);
-
-       if (la->la_ctime != 0)
-               GOTO(unlock2, rc = -ETXTBSY);
-
        rc = dt_ref_del(env, obj, th);
        if (rc != 0)
-               GOTO(unlock2, rc);
+               GOTO(stop, rc);
 
        rc = dt_destroy(env, obj, th);
        if (rc == 0)
@@ -2156,15 +2172,13 @@ static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
                       "But the original missed OST-object is found now.\n",
                       PFID(fid));
 
-       GOTO(unlock2, rc);
-
-unlock2:
-       dt_write_unlock(env, obj);
+       GOTO(stop, rc);
 
 stop:
        dt_trans_stop(env, dev, th);
 
-unlock1:
+unlock:
+       dt_write_unlock(env, obj);
        ldlm_lock_decref(&lh, LCK_EX);
 
 put:
@@ -2199,11 +2213,11 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
        struct lfsck_thread_info *info          = lfsck_env_info(env);
        struct lu_fid            *cfid2         = &info->lti_fid2;
        struct ost_id            *oi            = &info->lti_oi;
+       char                     *postfix       = info->lti_tmpbuf;
        struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
        struct dt_device         *dev           = com->lc_lfsck->li_bottom;
        struct thandle           *th            = NULL;
        struct lustre_handle      lh            = { 0 };
-       char                      postfix[64];
        __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
        int                       rc            = 0;
        ENTRY;
@@ -2235,7 +2249,7 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
                ea_buf->lb_len = ori_len;
 
                fid_zero(&rec->lor_fid);
-               snprintf(postfix, 64, "-"DFID"-%x",
+               snprintf(postfix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
                         PFID(lu_object_fid(&parent->do_lu)), ea_off);
                rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
                                                  "C-", postfix, ea_off);
@@ -2306,10 +2320,11 @@ static int lfsck_layout_recreate_lovea(const struct lu_env *env,
        struct lustre_handle      lh            = { 0 };
        __u32                     magic;
        int                       fl            = 0;
-       int                       rc;
+       int                       rc            = 0;
        int                       rc1;
        int                       i;
        __u16                     count;
+       bool                      locked        = false;
        ENTRY;
 
        CDEBUG(D_LFSCK, "Re-create the crashed layout EA: parent "
@@ -2322,6 +2337,26 @@ static int lfsck_layout_recreate_lovea(const struct lu_env *env,
                RETURN(rc);
 
 again:
+       if (locked) {
+               dt_write_unlock(env, parent);
+               locked = false;
+       }
+
+       if (handle != NULL) {
+               dt_trans_stop(env, dt, handle);
+               handle = NULL;
+       }
+
+       if (rc < 0)
+               GOTO(unlock_layout, rc);
+
+       if (buf->lb_len < rc) {
+               lu_buf_realloc(buf, rc);
+               buflen = buf->lb_len;
+               if (buf->lb_buf == NULL)
+                       GOTO(unlock_layout, rc = -ENOMEM);
+       }
+
        if (!(bk->lb_param & LPF_DRYRUN)) {
                handle = dt_trans_create(env, dt);
                if (IS_ERR(handle))
@@ -2338,45 +2373,23 @@ again:
        }
 
        dt_write_lock(env, parent, 0);
+       locked = true;
        rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
        if (rc == -ERANGE) {
                rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
                                  BYPASS_CAPA);
                LASSERT(rc != 0);
-
-               dt_write_unlock(env, parent);
-               if (handle != NULL) {
-                       dt_trans_stop(env, dt, handle);
-                       handle = NULL;
-               }
-
-               if (rc < 0)
-                       GOTO(unlock_layout, rc);
-
-               lu_buf_realloc(buf, rc);
-               buflen = buf->lb_len;
-               if (buf->lb_buf == NULL)
-                       GOTO(unlock_layout, rc = -ENOMEM);
-
-               fl = LU_XATTR_REPLACE;
                goto again;
        } else if (rc == -ENODATA || rc == 0) {
+               rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
+               /* If the declared is not big enough, re-try. */
+               if (buf->lb_len < rc)
+                       goto again;
+
                fl = LU_XATTR_CREATE;
        } else if (rc < 0) {
                GOTO(unlock_parent, rc);
        } else if (unlikely(buf->lb_len == 0)) {
-               dt_write_unlock(env, parent);
-               if (handle != NULL) {
-                       dt_trans_stop(env, dt, handle);
-                       handle = NULL;
-               }
-
-               lu_buf_alloc(buf, rc);
-               buflen = buf->lb_len;
-               if (buf->lb_buf == NULL)
-                       GOTO(unlock_layout, rc = -ENOMEM);
-
-               fl = LU_XATTR_REPLACE;
                goto again;
        } else {
                fl = LU_XATTR_REPLACE;
@@ -2386,22 +2399,7 @@ again:
                if (bk->lb_param & LPF_DRYRUN)
                        GOTO(unlock_parent, rc = 1);
 
-               rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
-               /* If the declared is not big enough, re-try. */
-               if (buf->lb_len < rc) {
-                       dt_write_unlock(env, parent);
-                       if (handle != NULL) {
-                               dt_trans_stop(env, dt, handle);
-                               handle = NULL;
-                       }
-
-                       lu_buf_realloc(buf, rc);
-                       buflen = buf->lb_len;
-                       if (buf->lb_buf == NULL)
-                               GOTO(unlock_layout, rc = -ENOMEM);
-
-                       goto again;
-               }
+               LASSERT(buf->lb_len >= rc);
 
                buf->lb_len = rc;
                rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
@@ -2437,22 +2435,10 @@ again:
                if (bk->lb_param & LPF_DRYRUN)
                        GOTO(unlock_parent, rc = 1);
 
-               rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
+               rc = lov_mds_md_size(ea_off + 1, magic);
                /* If the declared is not big enough, re-try. */
-               if (buf->lb_len < rc) {
-                       dt_write_unlock(env, parent);
-                       if (handle != NULL) {
-                               dt_trans_stop(env, dt, handle);
-                               handle = NULL;
-                       }
-
-                       lu_buf_realloc(buf, rc);
-                       buflen = buf->lb_len;
-                       if (buf->lb_buf == NULL)
-                               GOTO(unlock_layout, rc = -ENOMEM);
-
+               if (buf->lb_len < rc)
                        goto again;
-               }
 
                buf->lb_len = rc;
                rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
@@ -2527,7 +2513,8 @@ again:
        RETURN(rc);
 
 unlock_parent:
-       dt_write_unlock(env, parent);
+       if (locked)
+               dt_write_unlock(env, parent);
 
 stop:
        if (handle != NULL)
@@ -2549,7 +2536,7 @@ static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
        struct lfsck_layout     *lo     = com->lc_file_ram;
        struct lu_fid           *pfid   = &rec->lor_fid;
        struct dt_object        *parent = NULL;
-       __u32                    ea_off = pfid->f_ver;
+       __u32                    ea_off = pfid->f_stripe_idx;
        int                      rc     = 0;
        ENTRY;
 
@@ -2747,7 +2734,10 @@ static int lfsck_layout_recreate_ostobj(const struct lu_env *env,
        hint->dah_mode = 0;
        pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
        pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
-       pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx);
+       /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
+        * MDT-object's FID::f_ver, instead it is the OST-object index in its
+        * parent MDT-object's layout EA. */
+       pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
        buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
 
        rc = dt_declare_create(env, child, la, hint, NULL, handle);
@@ -2825,9 +2815,10 @@ static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
 
        pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
        pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
-       /* The ff_parent->f_ver is not the real parent fid->f_ver. Instead,
-        * it is the OST-object index in the parent MDT-object layout. */
-       pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx);
+       /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
+        * MDT-object's FID::f_ver, instead it is the OST-object index in its
+        * parent MDT-object's layout EA. */
+       pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
        buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
 
        rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
@@ -3239,10 +3230,10 @@ static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
                fid_zero(pfid);
        } else {
                fid_le_to_cpu(pfid, &pea->ff_parent);
-               /* OST-object does not save parent FID::f_ver, instead,
-                * the OST-object index in the parent MDT-object layout
-                * EA reuses the pfid->f_ver. */
-               idx = pfid->f_ver;
+               /* Currently, the filter_fid::ff_parent::f_ver is not the
+                * real parent MDT-object's FID::f_ver, instead it is the
+                * OST-object index in its parent MDT-object's layout EA. */
+               idx = pfid->f_stripe_idx;
                pfid->f_ver = 0;
        }
 
@@ -3391,7 +3382,7 @@ static int lfsck_layout_assistant(void *args)
                         * handled to avoid too frequent thread schedule. */
                        if (llmd->llmd_prefetched == 0 ||
                            (bk->lb_async_windows != 0 &&
-                            (bk->lb_async_windows >> 1) ==
+                            bk->lb_async_windows / 2 ==
                             llmd->llmd_prefetched))
                                wakeup = true;
                        spin_unlock(&llmd->llmd_lock);
@@ -3833,6 +3824,209 @@ lfsck_layout_slave_notify_master(const struct lu_env *env,
        RETURN_EXIT;
 }
 
+/*
+ * \ret -ENODATA: unrecognized stripe
+ * \ret = 0     : recognized stripe
+ * \ret < 0     : other failures
+ */
+static int lfsck_layout_master_check_pairs(const struct lu_env *env,
+                                          struct lfsck_component *com,
+                                          struct lu_fid *cfid,
+                                          struct lu_fid *pfid)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_buf                   *buf    = &info->lti_big_buf;
+       struct ost_id                   *oi     = &info->lti_oi;
+       struct dt_object                *obj;
+       struct lov_mds_md_v1            *lmm;
+       struct lov_ost_data_v1          *objs;
+       __u32                            idx    = pfid->f_stripe_idx;
+       __u32                            magic;
+       int                              rc     = 0;
+       int                              i;
+       __u16                            count;
+       ENTRY;
+
+       pfid->f_ver = 0;
+       obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+
+       dt_read_lock(env, obj, 0);
+       if (unlikely(!dt_object_exists(obj)))
+               GOTO(unlock, rc = -ENOENT);
+
+       rc = lfsck_layout_get_lovea(env, obj, buf, NULL);
+       if (rc < 0)
+               GOTO(unlock, rc);
+
+       if (rc == 0)
+               GOTO(unlock, rc = -ENODATA);
+
+       lmm = buf->lb_buf;
+       rc = lfsck_layout_verify_header(lmm);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
+        * been verified in lfsck_layout_verify_header() already. If some
+        * new magic introduced in the future, then layout LFSCK needs to
+        * be updated also. */
+       magic = le32_to_cpu(lmm->lmm_magic);
+       if (magic == LOV_MAGIC_V1) {
+               objs = &(lmm->lmm_objects[0]);
+       } else {
+               LASSERT(magic == LOV_MAGIC_V3);
+               objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
+       }
+
+       fid_to_ostid(cfid, oi);
+       count = le16_to_cpu(lmm->lmm_stripe_count);
+       for (i = 0; i < count; i++, objs++) {
+               struct ost_id oi2;
+
+               ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
+               if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
+                       GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
+       }
+
+       GOTO(unlock, rc = -ENODATA);
+
+unlock:
+       dt_read_unlock(env, obj);
+       lu_object_put(env, &obj->do_lu);
+
+       return rc;
+}
+
+/*
+ * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
+ * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
+ * layout EA from MDT to OST. On one hand, the OST no need to understand
+ * the layout EA structure; on the other hand, it may cause trouble when
+ * transfer large layout EA from MDT to OST via normal OUT RPC.
+ *
+ * \ret > 0: unrecognized stripe
+ * \ret = 0: recognized stripe
+ * \ret < 0: other failures
+ */
+static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
+                                         struct lfsck_component *com,
+                                         struct lu_fid *cfid,
+                                         struct lu_fid *pfid)
+{
+       struct lfsck_instance    *lfsck  = com->lc_lfsck;
+       struct obd_device        *obd    = lfsck->li_obd;
+       struct seq_server_site   *ss     =
+                       lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
+       struct obd_export        *exp    = NULL;
+       struct ptlrpc_request    *req    = NULL;
+       struct lfsck_request     *lr;
+       struct lu_seq_range       range  = { 0 };
+       int                       rc     = 0;
+       ENTRY;
+
+       if (unlikely(fid_is_idif(pfid)))
+               RETURN(1);
+
+       fld_range_set_any(&range);
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
+       if (rc != 0)
+               RETURN(rc == -ENOENT ? 1 : rc);
+
+       if (unlikely(!fld_range_is_mdt(&range)))
+               RETURN(1);
+
+       exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
+       if (unlikely(exp == NULL))
+               RETURN(1);
+
+       if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
+               GOTO(out, rc = -EOPNOTSUPP);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
+       if (rc != 0) {
+               ptlrpc_request_free(req);
+
+               GOTO(out, rc);
+       }
+
+       lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
+       memset(lr, 0, sizeof(*lr));
+       lr->lr_event = LE_PAIRS_VERIFY;
+       lr->lr_active = LT_LAYOUT;
+       lr->lr_fid = *cfid; /* OST-object itself FID. */
+       lr->lr_fid2 = *pfid; /* The claimed parent FID. */
+
+       ptlrpc_request_set_replen(req);
+       rc = ptlrpc_queue_wait(req);
+       ptlrpc_req_finished(req);
+
+       if (rc == -ENOENT || rc == -ENODATA)
+               rc = 1;
+
+       GOTO(out, rc);
+
+out:
+       if (exp != NULL)
+               class_export_put(exp);
+
+       return rc;
+}
+
+static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
+                                         struct lfsck_component *com,
+                                         struct lfsck_request *lr)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct filter_fid               *ff     = &info->lti_new_pfid;
+       struct lu_buf                   *buf;
+       struct dt_device                *dev    = com->lc_lfsck->li_bottom;
+       struct dt_object                *obj;
+       struct thandle                  *th     = NULL;
+       int                              rc     = 0;
+       ENTRY;
+
+       obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+
+       fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
+       buf = lfsck_buf_get(env, ff, sizeof(*ff));
+       dt_write_lock(env, obj, 0);
+       if (unlikely(!dt_object_exists(obj)))
+               GOTO(unlock, rc = 0);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(unlock, rc = PTR_ERR(th));
+
+       rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+unlock:
+       dt_write_unlock(env, obj);
+       lu_object_put(env, &obj->do_lu);
+
+       return rc;
+}
+
 /* layout APIs */
 
 static int lfsck_layout_reset(const struct lu_env *env,
@@ -5019,6 +5213,15 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env,
        bool                             fail  = false;
        ENTRY;
 
+       if (lr->lr_event == LE_PAIRS_VERIFY) {
+               int rc;
+
+               rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
+                                                    &lr->lr_fid2);
+
+               RETURN(rc);
+       }
+
        if (lr->lr_event != LE_PHASE1_DONE &&
            lr->lr_event != LE_PHASE2_DONE &&
            lr->lr_event != LE_PEER_EXIT)
@@ -5107,24 +5310,55 @@ static int lfsck_layout_slave_in_notify(const struct lu_env *env,
        struct lfsck_instance            *lfsck = com->lc_lfsck;
        struct lfsck_layout_slave_data   *llsd  = com->lc_data;
        struct lfsck_layout_slave_target *llst;
+       int                               rc;
        ENTRY;
 
-       if (lr->lr_event == LE_FID_ACCESSED) {
+       switch (lr->lr_event) {
+       case LE_FID_ACCESSED:
                lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
-
                RETURN(0);
-       }
+       case LE_CONDITIONAL_DESTROY:
+               rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
+               RETURN(rc);
+       case LE_PAIRS_VERIFY: {
+               lr->lr_status = LPVS_INIT;
+               /* Firstly, if the MDT-object which is claimed via OST-object
+                * local stored PFID xattr recognizes the OST-object, then it
+                * must be that the client given PFID is wrong. */
+               rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
+                                                   &lr->lr_fid3);
+               if (rc <= 0)
+                       RETURN(0);
 
-       if (lr->lr_event == LE_CONDITIONAL_DESTROY) {
-               int rc;
+               lr->lr_status = LPVS_INCONSISTENT;
+               /* The OST-object local stored PFID xattr is stale. We need to
+                * check whether the MDT-object that is claimed via the client
+                * given PFID information recognizes the OST-object or not. If
+                * matches, then need to update the OST-object's PFID xattr. */
+               rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
+                                                   &lr->lr_fid2);
+               /* For rc < 0 case:
+                * We are not sure whether the client given PFID information
+                * is correct or not, do nothing to avoid improper fixing.
+                *
+                * For rc > 0 case:
+                * The client given PFID information is also invalid, we can
+                * NOT fix the OST-object inconsistency.
+                */
+               if (rc != 0)
+                       RETURN(rc);
 
-               rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
+               lr->lr_status = LPVS_INCONSISTENT_TOFIX;
+               rc = lfsck_layout_slave_repair_pfid(env, com, lr);
 
                RETURN(rc);
        }
-
-       if (lr->lr_event != LE_PHASE2_DONE && lr->lr_event != LE_PEER_EXIT)
+       case LE_PHASE2_DONE:
+       case LE_PEER_EXIT:
+               break;
+       default:
                RETURN(-EINVAL);
+       }
 
        llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
        if (llst == NULL)
@@ -5820,10 +6054,10 @@ again1:
                GOTO(out, rc = -EINVAL);
 
        fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
-       /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver,
-        * instead, it is the OST-object index in its parent MDT-object
-        * layout EA. */
-       save = rec->lor_fid.f_ver;
+       /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
+        * MDT-object's FID::f_ver, instead it is the OST-object index in its
+        * parent MDT-object's layout EA. */
+       save = rec->lor_fid.f_stripe_idx;
        rec->lor_fid.f_ver = 0;
        rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
        /* If the orphan OST-object does not claim the MDT, then next.
@@ -5837,7 +6071,7 @@ again1:
                goto again1;
        }
 
-       rec->lor_fid.f_ver = save;
+       rec->lor_fid.f_stripe_idx = save;
        rec->lor_uid = la->la_uid;
        rec->lor_gid = la->la_gid;
 
index 5b5491a..6f1465c 100644 (file)
@@ -2341,7 +2341,8 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
        case LE_PHASE2_DONE:
        case LE_FID_ACCESSED:
        case LE_PEER_EXIT:
-       case LE_CONDITIONAL_DESTROY: {
+       case LE_CONDITIONAL_DESTROY:
+       case LE_PAIRS_VERIFY: {
                struct lfsck_instance  *lfsck;
                struct lfsck_component *com;
 
index b1eba33..16312eb 100644 (file)
@@ -885,7 +885,7 @@ int dt_index_read(const struct lu_env *env, struct dt_device *dev,
 
        if (!fid_is_quota(&ii->ii_fid) && !fid_is_layout_rbtree(&ii->ii_fid))
                /* Block access to all local files except quota files and
-                * layout brtree. */
+                * layout rbtree. */
                RETURN(-EPERM);
 
        /* lookup index object subject to the transfer */
index 0567556..8c3e8f2 100644 (file)
@@ -542,7 +542,7 @@ static int lustre_lwp_connect(struct obd_device *lwp)
        data->ocd_connect_flags |= OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID |
                OBD_CONNECT_AT | OBD_CONNECT_LRU_RESIZE |
                OBD_CONNECT_FULL20 | OBD_CONNECT_LVB_TYPE |
-               OBD_CONNECT_LIGHTWEIGHT;
+               OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LFSCK;
        OBD_ALLOC_PTR(uuid);
        if (uuid == NULL)
                GOTO(out, rc = -ENOMEM);
index e303fd6..000d4ae 100644 (file)
@@ -528,6 +528,38 @@ static int lprocfs_rd_lfsck_layout(char *page, char **start, off_t off,
        return lfsck_dump(ofd->ofd_osd, page, count, LT_LAYOUT);
 }
 
+static int lprocfs_rd_lfsck_verify_pfid(char *page, char **start, off_t off,
+                                       int count, int *eof, void *data)
+{
+       struct obd_device       *obd = data;
+       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+
+       *eof = 1;
+
+       return snprintf(page, count,
+                       "switch: %s\ndetected: "LPU64"\nrepaired: "LPU64"\n",
+                       ofd->ofd_lfsck_verify_pfid ? "on" : "off",
+                       ofd->ofd_inconsistency_self_detected,
+                       ofd->ofd_inconsistency_self_repaired);
+}
+
+static int lprocfs_wr_lfsck_verify_pfid(struct file *file, const char *buffer,
+                                       unsigned long count, void *data)
+{
+       struct obd_device       *obd = data;
+       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+       __u32                    val;
+       int                      rc;
+
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc != 0)
+               return rc;
+
+       ofd->ofd_lfsck_verify_pfid = !!val;
+
+       return count;
+}
+
 static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "uuid",                lprocfs_rd_uuid, 0, 0 },
        { "blocksize",           lprocfs_rd_blksize, 0, 0 },
@@ -580,6 +612,8 @@ static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "lfsck_speed_limit",  lprocfs_rd_lfsck_speed_limit,
                                lprocfs_wr_lfsck_speed_limit, 0 },
        { "lfsck_layout",       lprocfs_rd_lfsck_layout, 0, 0 },
+       { "lfsck_verify_pfid",  lprocfs_rd_lfsck_verify_pfid,
+                               lprocfs_wr_lfsck_verify_pfid, 0 },
        { 0 }
 };
 
index a8f193b..4cafa48 100644 (file)
@@ -1656,7 +1656,7 @@ static int ofd_punch_hdl(struct tgt_session_info *tsi)
        }
 
        rc = ofd_object_punch(tsi->tsi_env, fo, start, end, &info->fti_attr,
-                             ff);
+                             ff, (struct obdo *)oa);
        if (rc)
                GOTO(out_put, rc);
 
@@ -2128,6 +2128,9 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        m->ofd_tot_granted = 0;
        m->ofd_tot_pending = 0;
        m->ofd_seq_count = 0;
+       init_waitqueue_head(&m->ofd_inconsistency_thread.t_ctl_waitq);
+       INIT_LIST_HEAD(&m->ofd_inconsistency_list);
+       spin_lock_init(&m->ofd_inconsistency_lock);
 
        spin_lock_init(&m->ofd_batch_lock);
        rwlock_init(&obd->u.filter.fo_sptlrpc_lock);
@@ -2232,7 +2235,14 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        if (rc)
                GOTO(err_fini_lut, rc);
 
+       rc = ofd_start_inconsistency_verification_thread(m);
+       if (rc != 0)
+               GOTO(err_fini_fs, rc);
+
        RETURN(0);
+
+err_fini_fs:
+       ofd_fs_cleanup(env, m);
 err_fini_lut:
        tgt_fini(env, &m->ofd_lut);
 err_free_ns:
@@ -2254,12 +2264,13 @@ static void ofd_fini(const struct lu_env *env, struct ofd_device *m)
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        lfsck_stop(env, m->ofd_osd, &stop);
-       lfsck_degister(env, m->ofd_osd);
        target_recovery_fini(obd);
        obd_exports_barrier(obd);
        obd_zombie_barrier();
 
        tgt_fini(env, &m->ofd_lut);
+       ofd_stop_inconsistency_verification_thread(m);
+       lfsck_degister(env, m->ofd_osd);
        ofd_fs_cleanup(env, m);
 
        ofd_free_capa_keys(m);
index ab041ce..5195dbc 100644 (file)
@@ -131,6 +131,8 @@ struct ofd_device {
        struct dt_object        *ofd_health_check_file;
 
        int                      ofd_subdir_count;
+       __u64                    ofd_inconsistency_self_detected;
+       __u64                    ofd_inconsistency_self_repaired;
 
        cfs_list_t              ofd_seq_list;
        rwlock_t                ofd_seq_list_lock;
@@ -184,13 +186,17 @@ struct ofd_device {
                                 ofd_grant_compat_disable:1,
                                 /* Protected by ofd_lastid_rwsem. */
                                 ofd_lastid_rebuilding:1,
-                                ofd_record_fid_accessed:1;
+                                ofd_record_fid_accessed:1,
+                                ofd_lfsck_verify_pfid:1;
        struct seq_server_site   ofd_seq_site;
        /* the limit of SOFT_SYNC RPCs that will trigger a soft sync */
        unsigned int             ofd_soft_sync_limit;
        /* Protect ::ofd_lastid_rebuilding */
        struct rw_semaphore      ofd_lastid_rwsem;
        __u64                    ofd_lastid_gen;
+       struct ptlrpc_thread     ofd_inconsistency_thread;
+       struct list_head         ofd_inconsistency_list;
+       spinlock_t               ofd_inconsistency_lock;
 };
 
 static inline struct ofd_device *ofd_dev(struct lu_device *d)
@@ -216,7 +222,9 @@ static inline char *ofd_name(struct ofd_device *ofd)
 struct ofd_object {
        struct lu_object_header ofo_header;
        struct dt_object        ofo_obj;
-       int                     ofo_ff_exists;
+       struct lu_fid           ofo_pfid;
+       unsigned int            ofo_pfid_checking:1,
+                               ofo_pfid_verified:1;
 };
 
 static inline struct ofd_object *ofd_obj(struct lu_object *o)
@@ -303,7 +311,10 @@ struct ofd_thread_info {
        struct lu_attr                   fti_attr;
        struct lu_attr                   fti_attr2;
        struct ldlm_res_id               fti_resid;
-       struct filter_fid                fti_mds_fid;
+       union {
+               struct filter_fid        fti_mds_fid;
+               struct filter_fid_old    fti_mds_fid_old;
+       };
        struct ost_id                    fti_ostid;
        struct ofd_object               *fti_obj;
        union {
@@ -368,6 +379,10 @@ void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd);
 void ofd_seqs_free(const struct lu_env *env, struct ofd_device *ofd);
 
 /* ofd_io.c */
+int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd);
+int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd);
+int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo,
+                 struct obdo *oa);
 int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp,
               struct obdo *oa, int objcount, struct obd_ioobj *obj,
               struct niobuf_remote *rnb, int *nr_local,
@@ -410,7 +425,7 @@ struct ofd_object *ofd_object_find_or_create(const struct lu_env *env,
                                             struct ofd_device *ofd,
                                             const struct lu_fid *fid,
                                             struct lu_attr *attr);
-int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo);
+int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo);
 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
                          obd_id id, struct ofd_seq *oseq, int nr, int sync);
 
@@ -419,7 +434,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
                 struct lu_attr *la, struct filter_fid *ff);
 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                     __u64 start, __u64 end, struct lu_attr *la,
-                    struct filter_fid *ff);
+                    struct filter_fid *ff, struct obdo *oa);
 int ofd_object_destroy(const struct lu_env *, struct ofd_object *, int);
 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
                 struct lu_attr *la);
index 7d33fc1..6a81d87 100644 (file)
 
 #include "ofd_internal.h"
 
+struct ofd_inconsistency_item {
+       struct list_head         oii_list;
+       struct ofd_object       *oii_obj;
+       struct lu_fid            oii_pfid;
+};
+
+static void ofd_inconsistency_verify_one(const struct lu_env *env,
+                                        struct ofd_device *ofd,
+                                        struct ofd_inconsistency_item *oii,
+                                        struct lfsck_request *lr)
+{
+       struct ofd_object       *fo     = oii->oii_obj;
+       struct lu_fid           *pfid   = &fo->ofo_pfid;
+       int                      rc;
+
+       LASSERT(fo->ofo_pfid_checking);
+       LASSERT(!fo->ofo_pfid_verified);
+
+       lr->lr_fid = fo->ofo_header.loh_fid; /* OST-object itself FID. */
+       lr->lr_fid2 = oii->oii_pfid; /* client given PFID. */
+       lr->lr_fid3 = *pfid; /* OST local stored PFID. */
+
+       rc = lfsck_in_notify(env, ofd->ofd_osd, lr);
+       ofd_write_lock(env, fo);
+       switch (lr->lr_status) {
+       case LPVS_INIT:
+               LASSERT(rc <= 0);
+
+               if (rc < 0)
+                       CDEBUG(D_LFSCK, "%s: fail to verify OST local stored "
+                              "PFID xattr for "DFID", the client given PFID "
+                              DFID", OST local stored PFID "DFID": rc = %d\n",
+                              ofd_obd(ofd)->obd_name,
+                              PFID(&fo->ofo_header.loh_fid),
+                              PFID(&oii->oii_pfid), PFID(pfid), rc);
+               else
+                       fo->ofo_pfid_verified = 1;
+               break;
+       case LPVS_INCONSISTENT:
+               LASSERT(rc != 0);
+
+               ofd->ofd_inconsistency_self_detected++;
+               if (rc < 0)
+                       CDEBUG(D_LFSCK, "%s: fail to verify the client given "
+                              "PFID for "DFID", the client given PFID "DFID
+                              ", local stored PFID "DFID": rc = %d\n",
+                              ofd_obd(ofd)->obd_name,
+                              PFID(&fo->ofo_header.loh_fid),
+                              PFID(&oii->oii_pfid), PFID(pfid), rc);
+               else
+                       CDEBUG(D_LFSCK, "%s: both the client given PFID and "
+                              "the OST local stored PFID are stale for the "
+                              "OST-object "DFID", client given PFID is "DFID
+                              ", local stored PFID is "DFID"\n",
+                              ofd_obd(ofd)->obd_name,
+                              PFID(&fo->ofo_header.loh_fid),
+                              PFID(&oii->oii_pfid), PFID(pfid));
+               break;
+       case LPVS_INCONSISTENT_TOFIX:
+               ofd->ofd_inconsistency_self_detected++;
+               if (rc == 0) {
+                       ofd->ofd_inconsistency_self_repaired++;
+                       CDEBUG(D_LFSCK, "%s: fixed the staled OST PFID xattr "
+                              "for "DFID", with the client given PFID "DFID
+                              ", the old stored PFID "DFID"\n",
+                              ofd_obd(ofd)->obd_name,
+                              PFID(&fo->ofo_header.loh_fid),
+                              PFID(&oii->oii_pfid), PFID(pfid));
+               } else {
+                       CDEBUG(D_LFSCK, "%s: fail to fix the OST PFID xattr "
+                              "for "DFID", client given PFID "DFID", local "
+                              "stored PFID "DFID": rc = %d\n",
+                              ofd_obd(ofd)->obd_name,
+                              PFID(&fo->ofo_header.loh_fid),
+                              PFID(&oii->oii_pfid), PFID(pfid), rc);
+               }
+               *pfid = oii->oii_pfid;
+               fo->ofo_pfid_verified = 1;
+               break;
+       default:
+               break;
+       }
+       fo->ofo_pfid_checking = 0;
+       ofd_write_unlock(env, fo);
+
+       lu_object_put(env, &fo->ofo_obj.do_lu);
+       OBD_FREE_PTR(oii);
+}
+
+static int ofd_inconsistency_verification_main(void *args)
+{
+       struct lu_env                  env;
+       struct ofd_device             *ofd    = args;
+       struct ptlrpc_thread          *thread = &ofd->ofd_inconsistency_thread;
+       struct ofd_inconsistency_item *oii;
+       struct lfsck_request          *lr     = NULL;
+       struct l_wait_info             lwi    = { 0 };
+       int                            rc;
+       ENTRY;
+
+       rc = lu_env_init(&env, LCT_DT_THREAD);
+       spin_lock(&ofd->ofd_inconsistency_lock);
+       thread_set_flags(thread, rc != 0 ? SVC_STOPPED : SVC_RUNNING);
+       wake_up_all(&thread->t_ctl_waitq);
+       spin_unlock(&ofd->ofd_inconsistency_lock);
+       if (rc != 0)
+               RETURN(rc);
+
+       OBD_ALLOC_PTR(lr);
+       if (unlikely(lr == NULL))
+               GOTO(out, rc = -ENOMEM);
+
+       lr->lr_event = LE_PAIRS_VERIFY;
+       lr->lr_active = LT_LAYOUT;
+
+       spin_lock(&ofd->ofd_inconsistency_lock);
+       while (1) {
+               if (unlikely(!thread_is_running(thread)))
+                       break;
+
+               while (!list_empty(&ofd->ofd_inconsistency_list)) {
+                       oii = list_entry(ofd->ofd_inconsistency_list.next,
+                                        struct ofd_inconsistency_item,
+                                        oii_list);
+                       list_del_init(&oii->oii_list);
+                       spin_unlock(&ofd->ofd_inconsistency_lock);
+                       ofd_inconsistency_verify_one(&env, ofd, oii, lr);
+                       spin_lock(&ofd->ofd_inconsistency_lock);
+               }
+
+               spin_unlock(&ofd->ofd_inconsistency_lock);
+               l_wait_event(thread->t_ctl_waitq,
+                            !list_empty(&ofd->ofd_inconsistency_list) ||
+                            !thread_is_running(thread),
+                            &lwi);
+               spin_lock(&ofd->ofd_inconsistency_lock);
+       }
+
+       while (!list_empty(&ofd->ofd_inconsistency_list)) {
+               struct ofd_object *fo;
+
+               oii = list_entry(ofd->ofd_inconsistency_list.next,
+                                struct ofd_inconsistency_item,
+                                oii_list);
+               list_del_init(&oii->oii_list);
+               fo = oii->oii_obj;
+               spin_unlock(&ofd->ofd_inconsistency_lock);
+
+               ofd_write_lock(&env, fo);
+               fo->ofo_pfid_checking = 0;
+               ofd_write_unlock(&env, fo);
+
+               lu_object_put(&env, &fo->ofo_obj.do_lu);
+               OBD_FREE_PTR(oii);
+               spin_lock(&ofd->ofd_inconsistency_lock);
+       }
+
+       OBD_FREE_PTR(lr);
+
+       GOTO(out, rc = 0);
+
+out:
+       thread_set_flags(thread, SVC_STOPPED);
+       wake_up_all(&thread->t_ctl_waitq);
+       spin_unlock(&ofd->ofd_inconsistency_lock);
+       lu_env_fini(&env);
+
+       return rc;
+}
+
+int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
+{
+       struct ptlrpc_thread    *thread = &ofd->ofd_inconsistency_thread;
+       struct l_wait_info       lwi    = { 0 };
+       long                     rc;
+
+       spin_lock(&ofd->ofd_inconsistency_lock);
+       if (unlikely(thread_is_running(thread))) {
+               spin_unlock(&ofd->ofd_inconsistency_lock);
+
+               return -EALREADY;
+       }
+
+       thread_set_flags(thread, 0);
+       spin_unlock(&ofd->ofd_inconsistency_lock);
+       rc = PTR_ERR(kthread_run(ofd_inconsistency_verification_main, ofd,
+                                "inconsistency_verification"));
+       if (IS_ERR_VALUE(rc)) {
+               CERROR("%s: cannot start self_repair thread: rc = %ld\n",
+                      ofd_obd(ofd)->obd_name, rc);
+       } else {
+               rc = 0;
+               l_wait_event(thread->t_ctl_waitq,
+                            thread_is_running(thread) ||
+                            thread_is_stopped(thread),
+                            &lwi);
+       }
+
+       return rc;
+}
+
+int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd)
+{
+       struct ptlrpc_thread    *thread = &ofd->ofd_inconsistency_thread;
+       struct l_wait_info       lwi    = { 0 };
+
+       spin_lock(&ofd->ofd_inconsistency_lock);
+       if (thread_is_init(thread) || thread_is_stopped(thread)) {
+               spin_unlock(&ofd->ofd_inconsistency_lock);
+
+               return -EALREADY;
+       }
+
+       thread_set_flags(thread, SVC_STOPPING);
+       spin_unlock(&ofd->ofd_inconsistency_lock);
+       wake_up_all(&thread->t_ctl_waitq);
+       l_wait_event(thread->t_ctl_waitq,
+                    thread_is_stopped(thread),
+                    &lwi);
+
+       return 0;
+}
+
+static void ofd_add_inconsistency_item(const struct lu_env *env,
+                                      struct ofd_object *fo, struct obdo *oa)
+{
+       struct ofd_device               *ofd    = ofd_obj2dev(fo);
+       struct ofd_inconsistency_item   *oii;
+       bool                             wakeup = false;
+
+       OBD_ALLOC_PTR(oii);
+       if (oii == NULL) {
+               CERROR("%s: cannot alloc memory for verify OST-object "
+                      "consistency for "DFID", client given PFID "DFID
+                      ", local stored PFID "DFID"\n",
+                      ofd_obd(ofd)->obd_name, PFID(&fo->ofo_header.loh_fid),
+                      oa->o_parent_seq, oa->o_parent_oid, oa->o_stripe_idx,
+                      PFID(&fo->ofo_pfid));
+
+               return;
+       }
+
+       INIT_LIST_HEAD(&oii->oii_list);
+       lu_object_get(&fo->ofo_obj.do_lu);
+       oii->oii_obj = fo;
+       oii->oii_pfid.f_seq = oa->o_parent_seq;
+       oii->oii_pfid.f_oid = oa->o_parent_oid;
+       oii->oii_pfid.f_stripe_idx = oa->o_stripe_idx;
+
+       spin_lock(&ofd->ofd_inconsistency_lock);
+       if (fo->ofo_pfid_checking || fo->ofo_pfid_verified) {
+               spin_unlock(&ofd->ofd_inconsistency_lock);
+               OBD_FREE_PTR(oii);
+
+               return;
+       }
+
+       fo->ofo_pfid_checking = 1;
+       if (list_empty(&ofd->ofd_inconsistency_list))
+               wakeup = true;
+       list_add_tail(&oii->oii_list, &ofd->ofd_inconsistency_list);
+       spin_unlock(&ofd->ofd_inconsistency_lock);
+       if (wakeup)
+               wake_up_all(&ofd->ofd_inconsistency_thread.t_ctl_waitq);
+
+       /* XXX: When the found inconsistency exceeds some threshold,
+        *      we can trigger the LFSCK to scan part of the system
+        *      or the whole system, which depends on how to define
+        *      the threshold, a simple way maybe like that: define
+        *      the absolute value of how many inconsisteny allowed
+        *      to be repaired via self detect/repair mechanism, if
+        *      exceeded, then trigger the LFSCK to scan the layout
+        *      inconsistency within the whole system. */
+}
+
+int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo,
+                 struct obdo *oa)
+{
+       struct lu_fid   *pfid   = &fo->ofo_pfid;
+       int              rc     = 0;
+       ENTRY;
+
+       if (fid_is_sane(pfid)) {
+               if (likely(oa->o_parent_seq == pfid->f_seq &&
+                          oa->o_parent_oid == pfid->f_oid &&
+                          oa->o_stripe_idx == pfid->f_stripe_idx))
+                       RETURN(0);
+
+               if (fo->ofo_pfid_verified)
+                       RETURN(-EPERM);
+       }
+
+       /* The OST-object may be inconsistent, and we need further verification.
+        * To avoid block the RPC service thread, return -EINPROGRESS to client
+        * and make it retry later. */
+       if (fo->ofo_pfid_checking)
+               RETURN(-EINPROGRESS);
+
+       rc = ofd_object_ff_load(env, fo);
+       if (rc == -ENODATA)
+               RETURN(0);
+
+       if (rc < 0)
+               RETURN(rc);
+
+       if (likely(oa->o_parent_seq == pfid->f_seq &&
+                  oa->o_parent_oid == pfid->f_oid &&
+                  oa->o_stripe_idx == pfid->f_stripe_idx))
+               RETURN(0);
+
+       /* Push it to the dedicated thread for further verification. */
+       ofd_add_inconsistency_item(env, fo, oa);
+
+       RETURN(-EINPROGRESS);
+}
+
 static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
                           struct ofd_device *ofd, const struct lu_fid *fid,
-                          struct lu_attr *la, int niocount,
+                          struct lu_attr *la, struct obdo *oa, int niocount,
                           struct niobuf_remote *rnb, int *nr_local,
                           struct niobuf_local *lnb, char *jobid)
 {
@@ -63,6 +379,12 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
        if (!ofd_object_exists(fo))
                GOTO(unlock, rc = -ENOENT);
 
+       if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
+               rc = ofd_verify_ff(env, fo, oa);
+               if (rc != 0)
+                       GOTO(unlock, rc);
+       }
+
        /* parse remote buffers to local buffers and prepare the latter */
        *nr_local = 0;
        for (i = 0, j = 0; i < niocount; i++) {
@@ -148,6 +470,15 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                GOTO(out, rc = -ENOENT);
        }
 
+       if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
+               rc = ofd_verify_ff(env, fo, oa);
+               if (rc != 0) {
+                       ofd_read_unlock(env, fo);
+                       ofd_object_put(env, fo);
+                       GOTO(out, rc);
+               }
+       }
+
        /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
         * space back if possible */
        ofd_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
@@ -268,8 +599,9 @@ int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
                if (rc == 0) {
                        ofd_grant_prepare_read(env, exp, oa);
                        rc = ofd_preprw_read(env, exp, ofd, fid,
-                                            &info->fti_attr, obj->ioo_bufcnt,
-                                            rnb, nr_local, lnb, jobid);
+                                            &info->fti_attr, oa,
+                                            obj->ioo_bufcnt, rnb, nr_local,
+                                            lnb, jobid);
                        obdo_from_la(oa, &info->fti_attr, LA_ATIME);
                }
        } else {
@@ -332,7 +664,7 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
                GOTO(out, rc);
 
        if (ff != NULL) {
-               rc = ofd_object_ff_check(env, ofd_obj);
+               rc = ofd_object_ff_load(env, ofd_obj);
                if (rc == -ENODATA)
                        ff_needed = 1;
                else if (rc < 0)
@@ -389,11 +721,20 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
 
                rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID,
                                  0, th, BYPASS_CAPA);
-               if (rc)
-                       GOTO(out_tx, rc);
+               if (rc == 0) {
+                       ofd_obj->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq);
+                       ofd_obj->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid);
+                       /* Currently, the filter_fid::ff_parent::f_ver is not
+                        * the real parent MDT-object's FID::f_ver, instead it
+                        * is the OST-object index in its parent MDT-object's
+                        * layout EA. */
+                       ofd_obj->ofo_pfid.f_stripe_idx =
+                                       le32_to_cpu(ff->ff_parent.f_stripe_idx);
+               }
        }
 
-       EXIT;
+       GOTO(out_tx, rc);
+
 out_tx:
        dt_trans_stop(env, ofd->ofd_osd, th);
 out:
index 3917969..2f19478 100644 (file)
@@ -53,7 +53,9 @@ int ofd_version_get_check(struct ofd_thread_info *info,
        dt_obj_version_t curr_version;
 
        LASSERT(ofd_object_exists(fo));
-       LASSERT(info->fti_exp);
+
+       if (info->fti_exp)
+               RETURN(0);
 
        curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
        if ((__s64)curr_version == -EOPNOTSUPP)
@@ -112,32 +114,38 @@ struct ofd_object *ofd_object_find_or_create(const struct lu_env *env,
        RETURN(ofd_obj(fo_obj));
 }
 
-int ofd_object_ff_check(const struct lu_env *env, struct ofd_object *fo)
+int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
 {
-       int rc = 0;
+       struct ofd_thread_info  *info = ofd_info(env);
+       struct filter_fid_old   *ff   = &info->fti_mds_fid_old;
+       struct lu_buf           *buf  = &info->fti_buf;
+       struct lu_fid           *pfid = &fo->ofo_pfid;
+       int                      rc   = 0;
 
-       ENTRY;
+       if (fid_is_sane(pfid))
+               return 0;
 
-       if (!fo->ofo_ff_exists) {
-               /*
-                * This actually means that we don't know whether the object
-                * has the "fid" EA or not.
-                */
-               rc = dt_xattr_get(env, ofd_object_child(fo), &LU_BUF_NULL,
-                                 XATTR_NAME_FID, BYPASS_CAPA);
-               if (rc >= 0 || rc == -ENODATA) {
-                       /*
-                        * Here we assume that, if the object doesn't have the
-                        * "fid" EA, the caller will add one, unless a fatal
-                        * error (e.g., a memory or disk failure) prevents it
-                        * from doing so.
-                        */
-                       fo->ofo_ff_exists = 1;
-               }
-               if (rc > 0)
-                       rc = 0;
+       buf->lb_buf = ff;
+       buf->lb_len = sizeof(*ff);
+       rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID,
+                         BYPASS_CAPA);
+       if (rc < 0)
+               return rc;
+
+       if (rc < sizeof(struct lu_fid)) {
+               fid_zero(pfid);
+
+               return -ENODATA;
        }
-       RETURN(rc);
+
+       pfid->f_seq = le64_to_cpu(ff->ff_parent.f_seq);
+       pfid->f_oid = le32_to_cpu(ff->ff_parent.f_oid);
+       /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
+        * MDT-object's FID::f_ver, instead it is the OST-object index in its
+        * parent MDT-object's layout EA. */
+       pfid->f_stripe_idx = le32_to_cpu(ff->ff_parent.f_stripe_idx);
+
+       return 0;
 }
 
 void ofd_object_put(const struct lu_env *env, struct ofd_object *fo)
@@ -435,7 +443,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
                GOTO(unlock, rc);
 
        if (ff != NULL) {
-               rc = ofd_object_ff_check(env, fo);
+               rc = ofd_object_ff_load(env, fo);
                if (rc == -ENODATA)
                        ff_needed = 1;
                else if (rc < 0)
@@ -469,20 +477,34 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed)
+       if (ff_needed) {
                rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
                                  XATTR_NAME_FID, 0, th, BYPASS_CAPA);
+               if (rc == 0) {
+                       fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq);
+                       fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid);
+                       /* Currently, the filter_fid::ff_parent::f_ver is not
+                        * the real parent MDT-object's FID::f_ver, instead it
+                        * is the OST-object index in its parent MDT-object's
+                        * layout EA. */
+                       fo->ofo_pfid.f_stripe_idx =
+                                       le32_to_cpu(ff->ff_parent.f_stripe_idx);
+               }
+       }
+
+       GOTO(stop, rc);
 
 stop:
        ofd_trans_stop(env, ofd, th, rc);
 unlock:
        ofd_write_unlock(env, fo);
-       RETURN(rc);
+
+       return rc;
 }
 
 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                     __u64 start, __u64 end, struct lu_attr *la,
-                    struct filter_fid *ff)
+                    struct filter_fid *ff, struct obdo *oa)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_device       *ofd = ofd_obj2dev(fo);
@@ -506,6 +528,12 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
        if (!ofd_object_exists(fo))
                GOTO(unlock, rc = -ENOENT);
 
+       if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
+               rc = ofd_verify_ff(env, fo, oa);
+               if (rc != 0)
+                       GOTO(unlock, rc);
+       }
+
        /* VBR: version recovery check */
        rc = ofd_version_get_check(info, fo);
        if (rc)
@@ -516,7 +544,7 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                GOTO(unlock, rc);
 
        if (ff != NULL) {
-               rc = ofd_object_ff_check(env, fo);
+               rc = ofd_object_ff_load(env, fo);
                if (rc == -ENODATA)
                        ff_needed = 1;
                else if (rc < 0)
@@ -558,15 +586,29 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed)
+       if (ff_needed) {
                rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
                                  XATTR_NAME_FID, 0, th, BYPASS_CAPA);
+               if (rc == 0) {
+                       fo->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq);
+                       fo->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid);
+                       /* Currently, the filter_fid::ff_parent::f_ver is not
+                        * the real parent MDT-object's FID::f_ver, instead it
+                        * is the OST-object index in its parent MDT-object's
+                        * layout EA. */
+                       fo->ofo_pfid.f_stripe_idx =
+                                       le32_to_cpu(ff->ff_parent.f_stripe_idx);
+               }
+       }
+
+       GOTO(stop, rc);
 
 stop:
        ofd_trans_stop(env, ofd, th, rc);
 unlock:
        ofd_write_unlock(env, fo);
-       RETURN(rc);
+
+       return rc;
 }
 
 int ofd_object_destroy(const struct lu_env *env, struct ofd_object *fo,
index dd0a253..d19f618 100644 (file)
@@ -2666,6 +2666,8 @@ void lustre_swab_lfsck_request(struct lfsck_request *lr)
        __swab16s(&lr->lr_async_windows);
        CLASSERT(offsetof(typeof(*lr), lr_padding_1) != 0);
        lustre_swab_lu_fid(&lr->lr_fid);
+       lustre_swab_lu_fid(&lr->lr_fid2);
+       lustre_swab_lu_fid(&lr->lr_fid3);
        CLASSERT(offsetof(typeof(*lr), lr_padding_2) != 0);
        CLASSERT(offsetof(typeof(*lr), lr_padding_3) != 0);
 }
index 42d24d5..8c91793 100644 (file)
@@ -4582,7 +4582,7 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)sizeof(((struct object_update_reply *)0)->ourp_lens));
 
        /* Checks for struct lfsck_request */
-       LASSERTF((int)sizeof(struct lfsck_request) == 64, "found %lld\n",
+       LASSERTF((int)sizeof(struct lfsck_request) == 96, "found %lld\n",
                 (long long)(int)sizeof(struct lfsck_request));
        LASSERTF((int)offsetof(struct lfsck_request, lr_event) == 0, "found %lld\n",
                 (long long)(int)offsetof(struct lfsck_request, lr_event));
@@ -4628,14 +4628,48 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct lfsck_request, lr_fid));
        LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_fid) == 16, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_request *)0)->lr_fid));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_2) == 48, "found %lld\n",
+       LASSERTF((int)offsetof(struct lfsck_request, lr_fid2) == 48, "found %lld\n",
+                (long long)(int)offsetof(struct lfsck_request, lr_fid2));
+       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_fid2) == 16, "found %lld\n",
+                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_fid2));
+       LASSERTF((int)offsetof(struct lfsck_request, lr_fid3) == 64, "found %lld\n",
+                (long long)(int)offsetof(struct lfsck_request, lr_fid3));
+       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_fid3) == 16, "found %lld\n",
+                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_fid3));
+       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_2) == 80, "found %lld\n",
                 (long long)(int)offsetof(struct lfsck_request, lr_padding_2));
        LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_2) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_2));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_3) == 56, "found %lld\n",
+       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_3) == 88, "found %lld\n",
                 (long long)(int)offsetof(struct lfsck_request, lr_padding_3));
        LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_3) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_3));
+       LASSERTF(LE_LASTID_REBUILDING == 1, "found %lld\n",
+                (long long)LE_LASTID_REBUILDING);
+       LASSERTF(LE_LASTID_REBUILT == 2, "found %lld\n",
+                (long long)LE_LASTID_REBUILT);
+       LASSERTF(LE_PHASE1_DONE == 3, "found %lld\n",
+                (long long)LE_PHASE1_DONE);
+       LASSERTF(LE_PHASE2_DONE == 4, "found %lld\n",
+                (long long)LE_PHASE2_DONE);
+       LASSERTF(LE_START == 5, "found %lld\n",
+                (long long)LE_START);
+       LASSERTF(LE_STOP == 6, "found %lld\n",
+                (long long)LE_STOP);
+       LASSERTF(LE_QUERY == 7, "found %lld\n",
+                (long long)LE_QUERY);
+       LASSERTF(LE_FID_ACCESSED == 8, "found %lld\n",
+                (long long)LE_FID_ACCESSED);
+       LASSERTF(LE_PEER_EXIT == 9, "found %lld\n",
+                (long long)LE_PEER_EXIT);
+       LASSERTF(LE_CONDITIONAL_DESTROY == 10, "found %lld\n",
+                (long long)LE_CONDITIONAL_DESTROY);
+       LASSERTF(LE_PAIRS_VERIFY == 11, "found %lld\n",
+                (long long)LE_PAIRS_VERIFY);
+       LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n",
+               (unsigned)LEF_TO_OST);
+       LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",
+               (unsigned)LEF_FROM_OST);
 
        /* Checks for struct lfsck_reply */
        LASSERTF((int)sizeof(struct lfsck_reply) == 16, "found %lld\n",
index 74e8f47..efc8b41 100644 (file)
@@ -430,6 +430,12 @@ static int out_xattr_get(struct tgt_session_info *tsi)
 
        ENTRY;
 
+       if (!lu_object_exists(&obj->do_lu)) {
+               set_bit(LU_OBJECT_HEARD_BANSHEE,
+                       &obj->do_lu.lo_header->loh_flags);
+               RETURN(-ENOENT);
+       }
+
        name = object_update_param_get(update, 0, NULL);
        if (name == NULL) {
                CERROR("%s: empty name for xattr get: rc = %d\n",
index f980246..55b562c 100644 (file)
@@ -43,7 +43,7 @@ check_and_setup_lustre
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2c"
 
 [[ $(lustre_version_code ost1) -lt $(version_code 2.5.55) ]] &&
-       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18"
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18 19"
 
 build_test_filter
 
@@ -2036,6 +2036,10 @@ test_18e() {
                awk '/^status/ { print \\\$2 }'" "scanning-phase2" 6 ||
                error "(3) MDS1 is not the expected 'scanning-phase2'"
 
+       # to guarantee all updates are synced.
+       sync
+       sleep 2
+
        echo "Write new data to f2 to modify the new created OST-object."
        echo "dummy" >> $DIR/$tdir/a1/f2
 
@@ -2087,6 +2091,80 @@ test_18e() {
 }
 run_test 18e "Find out orphan OST-object and repair it (5)"
 
+test_19a() {
+       echo "stopall"
+       stopall > /dev/null
+       echo "formatall"
+       formatall > /dev/null
+       echo "setupall"
+       setupall > /dev/null
+
+       mkdir -p $DIR/$tdir
+       $LFS setstripe -c 1 -i 0 $DIR/$tdir
+
+       echo "foo" > $DIR/$tdir/a0
+       echo "guard" > $DIR/$tdir/a1
+
+       cancel_lru_locks osc
+       umount_client $MOUNT || error "(1) Fail to stop client!"
+       mount_client $MOUNT || error "(2) Fail to start client!"
+
+       echo "Inject failure, then client will offer wrong parent FID when read"
+       do_facet ost1 $LCTL set_param -n \
+               obdfilter.${FSNAME}-OST0000.lfsck_verify_pfid 1
+       #define OBD_FAIL_LFSCK_INVALID_PFID     0x1619
+       $LCTL set_param fail_loc=0x1619
+
+       echo "Read RPC with wrong parent FID should be denied"
+       cat $DIR/$tdir/a0 && error "(3) Read should be denied!"
+       $LCTL set_param fail_loc=0
+}
+run_test 19a "OST-object inconsistency self detect"
+
+test_19b() {
+       echo "stopall"
+       stopall > /dev/null
+       echo "formatall"
+       formatall > /dev/null
+       echo "setupall"
+       setupall > /dev/null
+
+       mkdir -p $DIR/$tdir
+       $LFS setstripe -c 1 -i 0 $DIR/$tdir
+
+       echo "Inject failure stub to make the OST-object to back point to"
+       echo "non-exist MDT-object"
+
+       #define OBD_FAIL_LFSCK_UNMATCHED_PAIR1  0x1611
+       do_facet ost1 $LCTL set_param fail_loc=0x1611
+       echo "foo" > $DIR/$tdir/f0
+       cancel_lru_locks osc
+       sync
+       sleep 2
+       do_facet ost1 $LCTL set_param fail_loc=0
+
+       echo "Nothing should be fixed since self detect and repair is disabled"
+       local repaired=$(do_facet ost1 $LCTL get_param -n \
+                       obdfilter.${FSNAME}-OST0000.lfsck_verify_pfid |
+                       awk '/^repaired/ { print $2 }')
+       [ $repaired -eq 0 ] ||
+               error "(1) Expected 0 repaired, but got $repaired"
+
+       echo "Read RPC with right parent FID should be accepted,"
+       echo "and cause parent FID on OST to be fixed"
+
+       do_facet ost1 $LCTL set_param -n \
+               obdfilter.${FSNAME}-OST0000.lfsck_verify_pfid 1
+       cat $DIR/$tdir/f0 || error "(2) Read should not be denied!"
+
+       repaired=$(do_facet ost1 $LCTL get_param -n \
+               obdfilter.${FSNAME}-OST0000.lfsck_verify_pfid |
+               awk '/^repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(3) Expected 1 repaired, but got $repaired"
+}
+run_test 19b "OST-object inconsistency self repair"
+
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size
index c4db9fb..a28b3b9 100644 (file)
@@ -82,13 +82,13 @@ int main(int argc, char *argv[])
                               le64_to_cpu(ffo->ff_parent.f_seq),
                               le32_to_cpu(ffo->ff_parent.f_oid), 0 /* ver */,
                               /* this is stripe_nr actually */
-                              le32_to_cpu(ffo->ff_parent.f_ver));
+                              le32_to_cpu(ffo->ff_parent.f_stripe_idx));
                } else {
                        printf("%s: parent="DFID" stripe=%u\n", argv[i],
                               le64_to_cpu(ff->ff_parent.f_seq),
                               le32_to_cpu(ff->ff_parent.f_oid), 0, /* ver */
                               /* this is stripe_nr actually */
-                              le32_to_cpu(ff->ff_parent.f_ver));
+                              le32_to_cpu(ff->ff_parent.f_stripe_idx));
                }
        }
 
index 88f35b1..ae85ce6 100644 (file)
@@ -2069,8 +2069,25 @@ static void check_lfsck_request(void)
        CHECK_MEMBER(lfsck_request, lr_async_windows);
        CHECK_MEMBER(lfsck_request, lr_padding_1);
        CHECK_MEMBER(lfsck_request, lr_fid);
+       CHECK_MEMBER(lfsck_request, lr_fid2);
+       CHECK_MEMBER(lfsck_request, lr_fid3);
        CHECK_MEMBER(lfsck_request, lr_padding_2);
        CHECK_MEMBER(lfsck_request, lr_padding_3);
+
+       CHECK_VALUE(LE_LASTID_REBUILDING);
+       CHECK_VALUE(LE_LASTID_REBUILT);
+       CHECK_VALUE(LE_PHASE1_DONE);
+       CHECK_VALUE(LE_PHASE2_DONE);
+       CHECK_VALUE(LE_START);
+       CHECK_VALUE(LE_STOP);
+       CHECK_VALUE(LE_QUERY);
+       CHECK_VALUE(LE_FID_ACCESSED);
+       CHECK_VALUE(LE_PEER_EXIT);
+       CHECK_VALUE(LE_CONDITIONAL_DESTROY);
+       CHECK_VALUE(LE_PAIRS_VERIFY);
+
+       CHECK_VALUE_X(LEF_TO_OST);
+       CHECK_VALUE_X(LEF_FROM_OST);
 }
 
 static void check_lfsck_reply(void)
index 4dd9c9d..927034f 100644 (file)
@@ -4591,7 +4591,7 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)sizeof(((struct object_update_reply *)0)->ourp_lens));
 
        /* Checks for struct lfsck_request */
-       LASSERTF((int)sizeof(struct lfsck_request) == 64, "found %lld\n",
+       LASSERTF((int)sizeof(struct lfsck_request) == 96, "found %lld\n",
                 (long long)(int)sizeof(struct lfsck_request));
        LASSERTF((int)offsetof(struct lfsck_request, lr_event) == 0, "found %lld\n",
                 (long long)(int)offsetof(struct lfsck_request, lr_event));
@@ -4637,14 +4637,48 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct lfsck_request, lr_fid));
        LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_fid) == 16, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_request *)0)->lr_fid));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_2) == 48, "found %lld\n",
+       LASSERTF((int)offsetof(struct lfsck_request, lr_fid2) == 48, "found %lld\n",
+                (long long)(int)offsetof(struct lfsck_request, lr_fid2));
+       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_fid2) == 16, "found %lld\n",
+                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_fid2));
+       LASSERTF((int)offsetof(struct lfsck_request, lr_fid3) == 64, "found %lld\n",
+                (long long)(int)offsetof(struct lfsck_request, lr_fid3));
+       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_fid3) == 16, "found %lld\n",
+                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_fid3));
+       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_2) == 80, "found %lld\n",
                 (long long)(int)offsetof(struct lfsck_request, lr_padding_2));
        LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_2) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_2));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_3) == 56, "found %lld\n",
+       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_3) == 88, "found %lld\n",
                 (long long)(int)offsetof(struct lfsck_request, lr_padding_3));
        LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_3) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_3));
+       LASSERTF(LE_LASTID_REBUILDING == 1, "found %lld\n",
+                (long long)LE_LASTID_REBUILDING);
+       LASSERTF(LE_LASTID_REBUILT == 2, "found %lld\n",
+                (long long)LE_LASTID_REBUILT);
+       LASSERTF(LE_PHASE1_DONE == 3, "found %lld\n",
+                (long long)LE_PHASE1_DONE);
+       LASSERTF(LE_PHASE2_DONE == 4, "found %lld\n",
+                (long long)LE_PHASE2_DONE);
+       LASSERTF(LE_START == 5, "found %lld\n",
+                (long long)LE_START);
+       LASSERTF(LE_STOP == 6, "found %lld\n",
+                (long long)LE_STOP);
+       LASSERTF(LE_QUERY == 7, "found %lld\n",
+                (long long)LE_QUERY);
+       LASSERTF(LE_FID_ACCESSED == 8, "found %lld\n",
+                (long long)LE_FID_ACCESSED);
+       LASSERTF(LE_PEER_EXIT == 9, "found %lld\n",
+                (long long)LE_PEER_EXIT);
+       LASSERTF(LE_CONDITIONAL_DESTROY == 10, "found %lld\n",
+                (long long)LE_CONDITIONAL_DESTROY);
+       LASSERTF(LE_PAIRS_VERIFY == 11, "found %lld\n",
+                (long long)LE_PAIRS_VERIFY);
+       LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n",
+               (unsigned)LEF_TO_OST);
+       LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",
+               (unsigned)LEF_FROM_OST);
 
        /* Checks for struct lfsck_reply */
        LASSERTF((int)sizeof(struct lfsck_reply) == 16, "found %lld\n",
@@ -4662,4 +4696,3 @@ void lustre_assert_wire_constants(void)
        LASSERTF((int)sizeof(((struct lfsck_reply *)0)->lr_padding_2) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_reply *)0)->lr_padding_2));
 }
-