Whamcloud - gitweb
LU-957 scrub: trigger OI scrub if found bad OI entry
authorFan Yong <yong.fan@whamcloud.com>
Thu, 14 Jun 2012 07:41:22 +0000 (15:41 +0800)
committerAndreas Dilger <adilger@whamcloud.com>
Fri, 22 Jun 2012 16:25:50 +0000 (12:25 -0400)
If some RPC involves OI lookup and finds inconsistent OI mapping,
it should trigger OI scrub to check and repair the inconsistency.

Known issues:
When the fid is returned to client, the OI mapping corresponding
to such fid may be not updated, or not committed to disk yet. If
server crashed before OI scrub completed, then recovery with the
fid corresponding to inconsistent OI mapping may fail or blocked.

Signed-off-by: Fan Yong <yong.fan@whamcloud.com>
Change-Id: I9709386aa6d42954b619f6b1342adae59a2ec5a9
Reviewed-on: http://review.whamcloud.com/2554
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/include/lu_object.h
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_orphans.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/obdecho/echo_client.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_scrub.c

index d99e974..4d93160 100644 (file)
@@ -172,8 +172,9 @@ struct lu_device_operations {
  * For lu_object_conf flags
  */
 typedef enum {
  * For lu_object_conf flags
  */
 typedef enum {
-        /* Currently, only used for client-side object initialization. */
-        LOC_F_NEW = 0x1,
+       /* This is a new object to be allocated, or the file
+        * corresponding to the object does not exists. */
+       LOC_F_NEW       = 0x00000001,
 } loc_flags_t;
 
 /**
 } loc_flags_t;
 
 /**
index bdeb805..18e3f55 100644 (file)
@@ -153,6 +153,9 @@ struct mdd_thread_info {
         struct lu_attr            mti_la_for_fix;
         struct md_attr            mti_ma;
         struct obd_info           mti_oi;
         struct lu_attr            mti_la_for_fix;
         struct md_attr            mti_ma;
         struct obd_info           mti_oi;
+       /* mti_orph_ent and mti_orph_key must be conjoint,
+        * then mti_orph_ent::lde_name will be mti_orph_key. */
+       struct lu_dirent          mti_orph_ent;
         char                      mti_orph_key[NAME_MAX + 1];
         struct obd_trans_info     mti_oti;
         struct lu_buf             mti_buf;
         char                      mti_orph_key[NAME_MAX + 1];
         struct obd_trans_info     mti_oti;
         struct lu_buf             mti_buf;
index 963493d..68f633b 100644 (file)
@@ -92,29 +92,6 @@ static struct dt_key* orph_key_fill_18(const struct lu_env *env,
                 return ERR_PTR(rc);
 }
 
                 return ERR_PTR(rc);
 }
 
-static int orphan_key_to_fid(char *key, struct lu_fid *lf)
-{
-        int rc = 0;
-        unsigned int op;
-
-        rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT,
-                    (long long unsigned int *)&lf->f_seq, &lf->f_oid,
-                    &lf->f_ver, &op);
-        if (rc == 4)
-                return 0;
-
-        /* build igif */
-        rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18,
-                    (long long unsigned int *)&lf->f_seq, &lf->f_oid);
-        if (rc == 2) {
-                lf->f_ver = 0;
-                return 0;
-        }
-
-        CERROR("can not parse orphan file name %s\n", key);
-        return -EINVAL;
-}
-
 static inline void mdd_orphan_write_lock(const struct lu_env *env,
                                     struct mdd_device *mdd)
 {
 static inline void mdd_orphan_write_lock(const struct lu_env *env,
                                     struct mdd_device *mdd)
 {
@@ -481,15 +458,13 @@ static int orph_key_test_and_del(const struct lu_env *env,
  * \retval -ve error
  */
 static int orph_index_iterate(const struct lu_env *env,
  * \retval -ve error
  */
 static int orph_index_iterate(const struct lu_env *env,
-                              struct mdd_device *mdd)
+                             struct mdd_device *mdd)
 {
 {
-        struct dt_object *dor = mdd->mdd_orphans;
-        char             *mti_key = mdd_env_info(env)->mti_orph_key;
-        const struct dt_it_ops *iops;
-        struct dt_it     *it;
-        char             *key;
-        struct lu_fid     fid;
-        int               result = 0;
+       struct dt_object *dor = mdd->mdd_orphans;
+       struct lu_dirent *ent = &mdd_env_info(env)->mti_orph_ent;
+       const struct dt_it_ops *iops;
+       struct dt_it     *it;
+       struct lu_fid     fid;
         int               key_sz = 0;
         int               rc;
         __u64             cookie;
         int               key_sz = 0;
         int               rc;
         __u64             cookie;
@@ -515,52 +490,48 @@ static int orph_index_iterate(const struct lu_env *env,
                 GOTO(out_put, rc = -EIO);
         }
 
                 GOTO(out_put, rc = -EIO);
         }
 
-        do {
-                key = (void *)iops->key(env, it);
-                if (IS_ERR(key)) {
-                        CERROR("%s: key failed when clean PENDING: rc = %ld\n",
-                               mdd->mdd_obd_dev->obd_name, PTR_ERR(key));
-                        goto next;
-                }
-                key_sz = iops->key_size(env, it);
-
-                /* filter out "." and ".." entries from PENDING dir. */
-                if (key_sz < 8)
-                        goto next;
-
-                memcpy(mti_key, key, key_sz);
-                mti_key[key_sz] = 0;
-
-                if (orphan_key_to_fid(mti_key, &fid))
-                        goto next;
-                if (!fid_is_sane(&fid)) {
-                        CERROR("%s: bad FID "DFID" cleaning PENDING\n",
-                               mdd->mdd_obd_dev->obd_name, PFID(&fid));
-                        goto next;
-                }
-
-                /* kill orphan object */
-                cookie = iops->store(env, it);
-                iops->put(env, it);
-                rc = orph_key_test_and_del(env, mdd, &fid,
-                                          (struct dt_key *)mti_key);
-
-                /* after index delete reset iterator */
-                if (rc == 0)
-                        result = iops->get(env, it, (const void *)"");
-                else
-                        result = iops->load(env, it, cookie);
+       do {
+               key_sz = iops->key_size(env, it);
+               /* filter out "." and ".." entries from PENDING dir. */
+               if (key_sz < 8)
+                       goto next;
+
+               rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
+               if (rc != 0) {
+                       CERROR("%s: fail to get FID for orphan it: rc = %d\n",
+                              mdd->mdd_obd_dev->obd_name, rc);
+                       goto next;
+               }
+
+               fid_le_to_cpu(&fid, &ent->lde_fid);
+               if (!fid_is_sane(&fid)) {
+                       CERROR("%s: bad FID "DFID" cleaning PENDING\n",
+                              mdd->mdd_obd_dev->obd_name, PFID(&fid));
+                       goto next;
+               }
+
+               /* kill orphan object */
+               cookie = iops->store(env, it);
+               iops->put(env, it);
+               rc = orph_key_test_and_del(env, mdd, &fid,
+                                          (struct dt_key *)ent->lde_name);
+
+               /* after index delete reset iterator */
+               if (rc == 0)
+                       rc = iops->get(env, it, (const void *)"");
+               else
+                       rc = iops->load(env, it, cookie);
 next:
 next:
-                result = iops->next(env, it);
-        } while (result == 0);
+               rc = iops->next(env, it);
+       } while (rc == 0);
 
 
-        GOTO(out_put, rc = 0);
+       GOTO(out_put, rc = 0);
 out_put:
 out_put:
-        iops->put(env, it);
-        iops->fini(env, it);
+       iops->put(env, it);
+       iops->fini(env, it);
 
 out:
 
 out:
-        return rc;
+       return rc;
 }
 
 /**
 }
 
 /**
index d81d0a5..e58ac8b 100644 (file)
@@ -2074,6 +2074,24 @@ static struct mdt_object *mdt_obj(struct lu_object *o)
         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
 }
 
         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
 }
 
+struct mdt_object *mdt_object_new(const struct lu_env *env,
+                                 struct mdt_device *d,
+                                 const struct lu_fid *f)
+{
+       struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
+       struct lu_object *o;
+       struct mdt_object *m;
+       ENTRY;
+
+       CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
+       o = lu_object_find(env, &d->mdt_md_dev.md_lu_dev, f, &conf);
+       if (unlikely(IS_ERR(o)))
+               m = (struct mdt_object *)o;
+       else
+               m = mdt_obj(o);
+       RETURN(m);
+}
+
 struct mdt_object *mdt_object_find(const struct lu_env *env,
                                    struct mdt_device *d,
                                    const struct lu_fid *f)
 struct mdt_object *mdt_object_find(const struct lu_env *env,
                                    struct mdt_device *d,
                                    const struct lu_fid *f)
index cfa9d3d..410b9df 100644 (file)
@@ -498,6 +498,9 @@ void mdt_object_unlock(struct mdt_thread_info *,
                        struct mdt_lock_handle *,
                        int decref);
 
                        struct mdt_lock_handle *,
                        int decref);
 
+struct mdt_object *mdt_object_new(const struct lu_env *,
+                                 struct mdt_device *,
+                                 const struct lu_fid *);
 struct mdt_object *mdt_object_find(const struct lu_env *,
                                    struct mdt_device *,
                                    const struct lu_fid *);
 struct mdt_object *mdt_object_find(const struct lu_env *,
                                    struct mdt_device *,
                                    const struct lu_fid *);
index 6bebe32..613a681 100644 (file)
@@ -1356,15 +1356,15 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 *child_fid = *info->mti_rr.rr_fid2;
                 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
                          PFID(child_fid));
                 *child_fid = *info->mti_rr.rr_fid2;
                 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
                          PFID(child_fid));
-        } else {
-                /*
-                 * Check for O_EXCL is moved to the mdt_finish_open(), we need to
-                 * return FID back in that case.
-                 */
-                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
-        }
-
-        child = mdt_object_find(info->mti_env, mdt, child_fid);
+               child = mdt_object_new(info->mti_env, mdt, child_fid);
+       } else {
+               /*
+                * Check for O_EXCL is moved to the mdt_finish_open(), we need to
+                * return FID back in that case.
+                */
+               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+               child = mdt_object_find(info->mti_env, mdt, child_fid);
+       }
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
index eecb266..60730a8 100644 (file)
@@ -303,14 +303,17 @@ static int mdt_md_create(struct mdt_thread_info *info)
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         rc = mdt_lookup_version_check(info, parent, lname,
                                       &info->mti_tmp_fid1, 1);
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
         rc = mdt_lookup_version_check(info, parent, lname,
                                       &info->mti_tmp_fid1, 1);
-        /* -ENOENT is expected here */
-        if (rc != 0 && rc != -ENOENT)
-                GOTO(out_put_parent, rc);
+       if (rc == 0)
+               GOTO(out_put_parent, rc = -EEXIST);
 
 
-        /* save version of file name for replay, it must be ENOENT here */
-        mdt_enoent_version_save(info, 1);
+       /* -ENOENT is expected here */
+       if (rc != -ENOENT)
+               GOTO(out_put_parent, rc);
 
 
-        child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
+       /* save version of file name for replay, it must be ENOENT here */
+       mdt_enoent_version_save(info, 1);
+
+       child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
 
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
 
@@ -334,11 +337,11 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
 
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
 
-                /*
-                 * Do perform lookup sanity check. We do not know if name exists
-                 * or not.
-                 */
-                info->mti_spec.sp_cr_lookup = 1;
+               /*
+                * Do not perform lookup sanity check. We know that name does
+                * not exist.
+                */
+               info->mti_spec.sp_cr_lookup = 0;
                 info->mti_spec.sp_feat = &dt_directory_features;
 
                 rc = mdo_create(info->mti_env, next, lname,
                 info->mti_spec.sp_feat = &dt_directory_features;
 
                 rc = mdo_create(info->mti_env, next, lname,
index 65d7429..9aefcb5 100644 (file)
@@ -202,6 +202,7 @@ struct echo_thread_info {
         struct cl_io            eti_io;
         struct cl_lock_descr    eti_descr;
         struct lu_fid           eti_fid;
         struct cl_io            eti_io;
         struct cl_lock_descr    eti_descr;
         struct lu_fid           eti_fid;
+       struct lu_fid           eti_fid2;
         struct md_op_spec       eti_spec;
         struct lov_mds_md_v3    eti_lmm;
         struct lov_user_md_v3   eti_lum;
         struct md_op_spec       eti_spec;
         struct lov_mds_md_v3    eti_lmm;
         struct lov_user_md_v3   eti_lum;
@@ -1416,20 +1417,27 @@ static inline void echo_md_build_name(struct lu_name *lname, char *name,
        lname->ln_namelen = strlen(name);
 }
 
        lname->ln_namelen = strlen(name);
 }
 
-static int echo_md_create_internal(const struct lu_env *env,
-                                   struct echo_device *ed,
-                                   struct md_object *parent,
-                                   struct lu_fid *fid,
-                                   struct lu_name *lname,
-                                   struct md_op_spec *spec,
-                                   struct md_attr *ma)
-{
-        struct lu_object        *ec_child, *child;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc;
-
-        ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
-                                     fid, NULL);
+static int
+echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
+                       struct md_object *parent, struct lu_fid *fid,
+                       struct lu_name *lname, struct md_op_spec *spec,
+                       struct md_attr *ma)
+{
+       struct lu_object        *ec_child, *child;
+       struct lu_device        *ld = ed->ed_next;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_fid           *fid2 = &info->eti_fid2;
+       struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
+       int                      rc;
+
+       rc = mdo_lookup(env, parent, lname, fid2, spec);
+       if (rc == 0)
+               return -EEXIST;
+       else if (rc != -ENOENT)
+               return rc;
+
+       ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
+                                    fid, &conf);
         if (IS_ERR(ec_child)) {
                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
                         PTR_ERR(ec_child));
         if (IS_ERR(ec_child)) {
                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
                         PTR_ERR(ec_child));
@@ -1445,6 +1453,10 @@ static int echo_md_create_internal(const struct lu_env *env,
         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
         CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
+       /*
+        * Do not perform lookup sanity check. We know that name does not exist.
+        */
+       spec->sp_cr_lookup = 0;
         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
         if (rc) {
                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
         rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
         if (rc) {
                 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
index 6db8e25..ed4ce8d 100644 (file)
@@ -340,53 +340,136 @@ struct inode *osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
        return inode;
 }
 
        return inode;
 }
 
-static int osd_fid_lookup(const struct lu_env *env,
-                          struct osd_object *obj, const struct lu_fid *fid)
+static struct inode *
+osd_iget_verify(struct osd_thread_info *info, struct osd_device *dev,
+               struct osd_inode_id *id, const struct lu_fid *fid)
 {
 {
-        struct osd_thread_info *info;
-        struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
-        struct osd_device      *dev;
-        struct osd_inode_id    *id;
-        struct inode           *inode;
-        int                     result;
+       struct lustre_mdt_attrs *lma   = &info->oti_mdt_attrs;
+       struct inode            *inode;
+       int                      rc;
 
 
-        LINVRNT(osd_invariant(obj));
-        LASSERT(obj->oo_inode == NULL);
-        LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
-        /*
-         * This assertion checks that osd layer sees only local
-         * fids. Unfortunately it is somewhat expensive (does a
-         * cache-lookup). Disabling it for production/acceptance-testing.
-         */
-        LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
+       inode = osd_iget(info, dev, id);
+       if (IS_ERR(inode))
+               return inode;
 
 
-        ENTRY;
+       rc = osd_get_lma(inode, &info->oti_obj_dentry, lma);
+       if (rc != 0) {
+               if (rc == -ENODATA) {
+                       CDEBUG(D_LFSCK, "inconsistent obj: NULL, %lu, "DFID"\n",
+                              inode->i_ino, PFID(fid));
+                       rc = -EREMCHG;
+               }
+               iput(inode);
+               return ERR_PTR(rc);
+       }
+
+       if (!lu_fid_eq(fid, &lma->lma_self_fid)) {
+               CDEBUG(D_LFSCK, "inconsistent obj: "DFID", %lu, "DFID"\n",
+                      PFID(&lma->lma_self_fid), inode->i_ino, PFID(fid));
+               iput(inode);
+               return ERR_PTR(EREMCHG);
+       }
+       return inode;
+}
 
 
-        info = osd_oti_get(env);
-        LASSERT(info);
-        dev  = osd_dev(ldev);
-        id   = &info->oti_id;
+static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
+                         const struct lu_fid *fid,
+                         const struct lu_object_conf *conf)
+{
+       struct osd_thread_info *info;
+       struct lu_device       *ldev   = obj->oo_dt.do_lu.lo_dev;
+       struct osd_device      *dev;
+       struct osd_idmap_cache *oic;
+       struct osd_inode_id    *id;
+       struct inode           *inode;
+       struct osd_scrub       *scrub;
+       struct scrub_file      *sf;
+       int                     result;
+       int                     verify = 0;
+       ENTRY;
 
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
-                RETURN(-ENOENT);
+       LINVRNT(osd_invariant(obj));
+       LASSERT(obj->oo_inode == NULL);
+       LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
+
+       dev = osd_dev(ldev);
+       scrub = &dev->od_scrub;
+       sf = &scrub->os_file;
+       info = osd_oti_get(env);
+       LASSERT(info);
+       oic = &info->oti_cache;
+       id  = &oic->oic_lid;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
+               RETURN(-ENOENT);
+
+       if (fid_is_norm(fid)) {
+               /* Search order: 1. per-thread cache. */
+               if (lu_fid_eq(fid, &oic->oic_fid)) {
+                       goto iget;
+               } else if (!cfs_list_empty(&scrub->os_inconsistent_items)) {
+                       /* Search order: 2. OI scrub pending list. */
+                       result = osd_oii_lookup(dev, fid, id);
+                       if (result == 0)
+                               goto iget;
+               }
 
 
-        result = osd_oi_lookup(info, dev, fid, id);
-        if (result != 0) {
-                if (result == -ENOENT)
-                        result = 0;
-                GOTO(out, result);
-        }
+               if (sf->sf_flags & SF_INCONSISTENT)
+                       verify = 1;
+       }
+
+       fid_zero(&oic->oic_fid);
+       /* Search order: 3. OI files. */
+       result = osd_oi_lookup(info, dev, fid, id);
+       if (result != 0 && result != -ENOENT)
+               GOTO(out, result);
+
+       /* If fid wasn't found in oi, inode-less object is created,
+        * for which lu_object_exists() returns false. This is used
+        * in a (frequent) case when objects are created as locking
+        * anchors or place holders for objects yet to be created. */
+       if (conf != NULL && conf->loc_flags & LOC_F_NEW) {
+               if (unlikely(result == 0))
+                       GOTO(out, result = -EEXIST);
+               else
+                       GOTO(out, result = 0);
+       }
+
+       if (result == -ENOENT) {
+               if (!fid_is_norm(fid) ||
+                   !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid),
+                                     sf->sf_oi_bitmap))
+                       GOTO(out, result = 0);
+
+               goto trigger;
+       }
+
+iget:
+       if (verify == 0)
+               inode = osd_iget(info, dev, id);
+       else
+               inode = osd_iget_verify(info, dev, id, fid);
+       if (IS_ERR(inode)) {
+               result = PTR_ERR(inode);
+               if (result == -ENOENT || result == -ESTALE) {
+                       result = 0;
+               } else if (result == -EREMCHG) {
+
+trigger:
+                       if (thread_is_running(&scrub->os_thread)) {
+                               result = -EINPROGRESS;
+                       } else if (!scrub->os_no_scrub) {
+                               result = osd_scrub_start(dev);
+                               LCONSOLE_ERROR("Trigger OI scrub by RPC for "
+                                              DFID", rc = %d\n",
+                                              PFID(fid), result);
+                               if (result == 0 || result == -EALREADY)
+                                       result = -EINPROGRESS;
+                               else
+                                       result = -EREMCHG;
+                       }
+               }
 
 
-        inode = osd_iget(info, dev, id);
-        if (IS_ERR(inode)) {
-                /*
-                 * If fid wasn't found in oi, inode-less object is
-                 * created, for which lu_object_exists() returns
-                 * false. This is used in a (frequent) case when
-                 * objects are created as locking anchors or
-                 * place holders for objects yet to be created.
-                 */
-                result = PTR_ERR(inode);
                 GOTO(out, result);
         }
 
                 GOTO(out, result);
         }
 
@@ -430,14 +513,14 @@ static void osd_object_init0(struct osd_object *obj)
  * life-cycle.
  */
 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
  * life-cycle.
  */
 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
-                           const struct lu_object_conf *unused)
+                          const struct lu_object_conf *conf)
 {
 {
-        struct osd_object *obj = osd_obj(l);
-        int result;
+       struct osd_object *obj = osd_obj(l);
+       int result;
 
 
-        LINVRNT(osd_invariant(obj));
+       LINVRNT(osd_invariant(obj));
 
 
-        result = osd_fid_lookup(env, obj, lu_object_fid(l));
+       result = osd_fid_lookup(env, obj, lu_object_fid(l), conf);
         obj->oo_dt.do_body_ops = &osd_body_ops_new;
         if (result == 0) {
                 if (obj->oo_inode != NULL)
         obj->oo_dt.do_body_ops = &osd_body_ops_new;
         if (result == 0) {
                 if (obj->oo_inode != NULL)
@@ -1982,11 +2065,11 @@ void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
  * \retval 0 on success
  */
 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
  * \retval 0 on success
  */
 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
-                         __u32 ino, struct lu_fid *fid)
+                         __u32 ino, struct lu_fid *fid,
+                         struct osd_inode_id *id)
 {
 {
-       struct osd_thread_info  *info = osd_oti_get(env);
-       struct osd_inode_id     *id = &info->oti_id;
-       struct inode            *inode;
+       struct osd_thread_info *info  = osd_oti_get(env);
+       struct inode           *inode;
        ENTRY;
 
        osd_id_gen(id, ino, OSD_OII_NOGEN);
        ENTRY;
 
        osd_id_gen(id, ino, OSD_OII_NOGEN);
@@ -3115,6 +3198,50 @@ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
         return rc;
 }
 
         return rc;
 }
 
+static int
+osd_consistency_check(struct osd_thread_info *oti, struct osd_device *dev,
+                     struct osd_idmap_cache *oic)
+{
+       struct osd_scrub    *scrub = &dev->od_scrub;
+       struct lu_fid       *fid   = &oic->oic_fid;
+       struct osd_inode_id *id    = &oti->oti_id;
+       int                  once  = 0;
+       int                  rc;
+       ENTRY;
+
+again:
+       rc = osd_oi_lookup(oti, dev, fid, id);
+       if (rc != 0 && rc != -ENOENT)
+               RETURN(rc);
+
+       if (rc == 0 && osd_id_eq(id, &oic->oic_lid))
+               RETURN(0);
+
+       if (thread_is_running(&scrub->os_thread)) {
+               rc = osd_oii_insert(dev, oic, rc == -ENOENT);
+               /* There is race condition between osd_oi_lookup and OI scrub.
+                * The OI scrub finished just after osd_oi_lookup() failure.
+                * Under such case, it is unnecessary to trigger OI scrub again,
+                * but try to call osd_oi_lookup() again. */
+               if (unlikely(rc == -EAGAIN))
+                       goto again;
+
+               RETURN(rc);
+       }
+
+       if (!scrub->os_no_scrub && ++once == 1) {
+               CDEBUG(D_LFSCK, "Trigger OI scrub by RPC for "DFID"\n",
+                      PFID(fid));
+               rc = osd_scrub_start(dev);
+               CDEBUG(D_LFSCK, "Trigger OI scrub by RPC for "DFID", rc = %d\n",
+                      PFID(fid), rc);
+               if (rc == 0)
+                       goto again;
+       }
+
+       RETURN(rc = -EREMCHG);
+}
+
 /**
  * Calls ->lookup() to find dentry. From dentry get inode and
  * read inode's ea to get fid. This is required for  interoperability
 /**
  * Calls ->lookup() to find dentry. From dentry get inode and
  * read inode's ea to get fid. This is required for  interoperability
@@ -3150,22 +3277,43 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
 
         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
         if (bh) {
 
         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
         if (bh) {
-                ino = le32_to_cpu(de->inode);
-                rc = osd_get_fid_from_dentry(de, rec);
+               struct osd_thread_info *oti = osd_oti_get(env);
+               struct osd_idmap_cache *oic = &oti->oti_cache;
+               struct osd_device *dev = osd_obj2dev(obj);
+               struct osd_scrub *scrub = &dev->od_scrub;
+               struct scrub_file *sf = &scrub->os_file;
+
+               ino = le32_to_cpu(de->inode);
+               rc = osd_get_fid_from_dentry(de, rec);
+
+               /* done with de, release bh */
+               brelse(bh);
+               if (rc != 0)
+                       rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
+               else
+                       osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
+
+               if (rc != 0 || !fid_is_norm(fid))
+                       GOTO(out, rc);
+
+               oic->oic_fid = *fid;
+               if ((scrub->os_pos_current <= ino) &&
+                   (sf->sf_flags & SF_INCONSISTENT ||
+                    ldiskfs_test_bit(osd_oi_fid2idx(dev, fid),
+                                     sf->sf_oi_bitmap)))
+                       rc = osd_consistency_check(oti, dev, oic);
+       } else {
+               rc = -ENOENT;
+       }
 
 
-                /* done with de, release bh */
-                brelse(bh);
-                if (rc != 0)
-                        rc = osd_ea_fid_get(env, obj, ino, fid);
-        } else {
-                rc = -ENOENT;
-        }
+       GOTO(out, rc);
 
 
-        if (hlock != NULL)
-                ldiskfs_htree_unlock(hlock);
-        else
-                cfs_up_read(&obj->oo_ext_idx_sem);
-        RETURN (rc);
+out:
+       if (hlock != NULL)
+               ldiskfs_htree_unlock(hlock);
+       else
+               cfs_up_read(&obj->oo_ext_idx_sem);
+       return rc;
 }
 
 /**
 }
 
 /**
@@ -3832,27 +3980,45 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
  * \retval -ve on error
  */
 static inline int osd_it_ea_rec(const struct lu_env *env,
  * \retval -ve on error
  */
 static inline int osd_it_ea_rec(const struct lu_env *env,
-                                const struct dt_it *di,
-                                struct dt_rec *dtrec, __u32 attr)
-{
-        struct osd_it_ea        *it     = (struct osd_it_ea *)di;
-        struct osd_object       *obj    = it->oie_obj;
-        struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
-        struct lu_dirent        *lde    = (struct lu_dirent *)dtrec;
-        int    rc = 0;
+                               const struct dt_it *di,
+                               struct dt_rec *dtrec, __u32 attr)
+{
+       struct osd_it_ea       *it    = (struct osd_it_ea *)di;
+       struct osd_object      *obj   = it->oie_obj;
+       struct osd_device      *dev   = osd_obj2dev(obj);
+       struct osd_scrub       *scrub = &dev->od_scrub;
+       struct scrub_file      *sf    = &scrub->os_file;
+       struct osd_thread_info *oti   = osd_oti_get(env);
+       struct osd_idmap_cache *oic   = &oti->oti_cache;
+       struct lu_fid          *fid   = &it->oie_dirent->oied_fid;
+       struct lu_dirent       *lde   = (struct lu_dirent *)dtrec;
+       __u32                   ino   = it->oie_dirent->oied_ino;
+       int                     rc    = 0;
+       ENTRY;
 
 
-        ENTRY;
+       if (!fid_is_sane(fid)) {
+               rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
+               if (rc != 0)
+                       RETURN(rc);
+       } else {
+               osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
+       }
 
 
-        if (!fid_is_sane(fid))
-                rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
+       osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
+                          it->oie_dirent->oied_name,
+                          it->oie_dirent->oied_namelen,
+                          it->oie_dirent->oied_type, attr);
 
 
-        if (rc == 0)
-                osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
-                                   it->oie_dirent->oied_name,
-                                   it->oie_dirent->oied_namelen,
-                                   it->oie_dirent->oied_type,
-                                   attr);
-        RETURN(rc);
+       if (!fid_is_norm(fid))
+               RETURN(0);
+
+       oic->oic_fid = *fid;
+       if ((scrub->os_pos_current <= ino) &&
+           (sf->sf_flags & SF_INCONSISTENT ||
+            ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), sf->sf_oi_bitmap)))
+               rc = osd_consistency_check(oti, dev, oic);
+
+       RETURN(rc);
 }
 
 /**
 }
 
 /**
index 307ecfa..7e573d1 100644 (file)
@@ -553,6 +553,7 @@ struct osd_thread_info {
                 long long      oti_alignment_lieutenant_colonel;
         };
 
                 long long      oti_alignment_lieutenant_colonel;
         };
 
+       struct osd_idmap_cache oti_cache;
 
         int                    oti_r_locks;
         int                    oti_w_locks;
 
         int                    oti_r_locks;
         int                    oti_w_locks;
@@ -623,6 +624,10 @@ int osd_scrub_file_store(struct osd_scrub *scrub);
 int osd_scrub_start(struct osd_device *dev);
 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev);
 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev);
 int osd_scrub_start(struct osd_device *dev);
 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev);
 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev);
+int osd_oii_insert(struct osd_device *dev, struct osd_idmap_cache *oic,
+                  int insert);
+int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
+                  struct osd_inode_id *id);
 
 /*
  * Invariants, assertions.
 
 /*
  * Invariants, assertions.
index 0e6382f..45c8233 100644 (file)
@@ -1308,3 +1308,58 @@ const struct dt_index_operations osd_otable_ops = {
                .load     = osd_otable_it_load,
        }
 };
                .load     = osd_otable_it_load,
        }
 };
+
+int osd_oii_insert(struct osd_device *dev, struct osd_idmap_cache *oic,
+                  int insert)
+{
+       struct osd_inconsistent_item *oii;
+       struct osd_scrub             *scrub  = &dev->od_scrub;
+       struct ptlrpc_thread         *thread = &scrub->os_thread;
+       int                           wakeup = 0;
+       ENTRY;
+
+       OBD_ALLOC_PTR(oii);
+       if (unlikely(oii == NULL))
+               RETURN(-ENOMEM);
+
+       CFS_INIT_LIST_HEAD(&oii->oii_list);
+       oii->oii_cache = *oic;
+       oii->oii_insert = insert;
+
+       cfs_spin_lock(&scrub->os_lock);
+       if (unlikely(!thread_is_running(thread))) {
+               cfs_spin_unlock(&scrub->os_lock);
+               OBD_FREE_PTR(oii);
+               RETURN(-EAGAIN);
+       }
+
+       if (cfs_list_empty(&scrub->os_inconsistent_items))
+               wakeup = 1;
+       cfs_list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
+       cfs_spin_unlock(&scrub->os_lock);
+
+       if (wakeup != 0)
+               cfs_waitq_broadcast(&thread->t_ctl_waitq);
+
+       RETURN(0);
+}
+
+int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
+                  struct osd_inode_id *id)
+{
+       struct osd_scrub             *scrub = &dev->od_scrub;
+       struct osd_inconsistent_item *oii;
+       ENTRY;
+
+       cfs_spin_lock(&scrub->os_lock);
+       cfs_list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
+               if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
+                       *id = oii->oii_cache.oic_lid;
+                       cfs_spin_unlock(&scrub->os_lock);
+                       RETURN(0);
+               }
+       }
+       cfs_spin_unlock(&scrub->os_lock);
+
+       RETURN(-ENOENT);
+}