return ERR_PTR(rc);
}
-static int orphan_key_to_fid(char *key, struct lu_fid *lf)
-{
- int rc = 0;
- unsigned int op;
-
- rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT,
- (long long unsigned int *)&lf->f_seq, &lf->f_oid,
- &lf->f_ver, &op);
- if (rc == 4)
- return 0;
-
- /* build igif */
- rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18,
- (long long unsigned int *)&lf->f_seq, &lf->f_oid);
- if (rc == 2) {
- lf->f_ver = 0;
- return 0;
- }
-
- CERROR("can not parse orphan file name %s\n", key);
- return -EINVAL;
-}
-
static inline void mdd_orphan_write_lock(const struct lu_env *env,
struct mdd_device *mdd)
{
* \retval -ve error
*/
static int orph_index_iterate(const struct lu_env *env,
- struct mdd_device *mdd)
+ struct mdd_device *mdd)
{
- struct dt_object *dor = mdd->mdd_orphans;
- char *mti_key = mdd_env_info(env)->mti_orph_key;
- const struct dt_it_ops *iops;
- struct dt_it *it;
- char *key;
- struct lu_fid fid;
- int result = 0;
+ struct dt_object *dor = mdd->mdd_orphans;
+ struct lu_dirent *ent = &mdd_env_info(env)->mti_orph_ent;
+ const struct dt_it_ops *iops;
+ struct dt_it *it;
+ struct lu_fid fid;
int key_sz = 0;
int rc;
__u64 cookie;
GOTO(out_put, rc = -EIO);
}
- do {
- key = (void *)iops->key(env, it);
- if (IS_ERR(key)) {
- CERROR("%s: key failed when clean PENDING: rc = %ld\n",
- mdd->mdd_obd_dev->obd_name, PTR_ERR(key));
- goto next;
- }
- key_sz = iops->key_size(env, it);
-
- /* filter out "." and ".." entries from PENDING dir. */
- if (key_sz < 8)
- goto next;
-
- memcpy(mti_key, key, key_sz);
- mti_key[key_sz] = 0;
-
- if (orphan_key_to_fid(mti_key, &fid))
- goto next;
- if (!fid_is_sane(&fid)) {
- CERROR("%s: bad FID "DFID" cleaning PENDING\n",
- mdd->mdd_obd_dev->obd_name, PFID(&fid));
- goto next;
- }
-
- /* kill orphan object */
- cookie = iops->store(env, it);
- iops->put(env, it);
- rc = orph_key_test_and_del(env, mdd, &fid,
- (struct dt_key *)mti_key);
-
- /* after index delete reset iterator */
- if (rc == 0)
- result = iops->get(env, it, (const void *)"");
- else
- result = iops->load(env, it, cookie);
+ do {
+ key_sz = iops->key_size(env, it);
+ /* filter out "." and ".." entries from PENDING dir. */
+ if (key_sz < 8)
+ goto next;
+
+ rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
+ if (rc != 0) {
+ CERROR("%s: fail to get FID for orphan it: rc = %d\n",
+ mdd->mdd_obd_dev->obd_name, rc);
+ goto next;
+ }
+
+ fid_le_to_cpu(&fid, &ent->lde_fid);
+ if (!fid_is_sane(&fid)) {
+ CERROR("%s: bad FID "DFID" cleaning PENDING\n",
+ mdd->mdd_obd_dev->obd_name, PFID(&fid));
+ goto next;
+ }
+
+ /* kill orphan object */
+ cookie = iops->store(env, it);
+ iops->put(env, it);
+ rc = orph_key_test_and_del(env, mdd, &fid,
+ (struct dt_key *)ent->lde_name);
+
+ /* after index delete reset iterator */
+ if (rc == 0)
+ rc = iops->get(env, it, (const void *)"");
+ else
+ rc = iops->load(env, it, cookie);
next:
- result = iops->next(env, it);
- } while (result == 0);
+ rc = iops->next(env, it);
+ } while (rc == 0);
- GOTO(out_put, rc = 0);
+ GOTO(out_put, rc = 0);
out_put:
- iops->put(env, it);
- iops->fini(env, it);
+ iops->put(env, it);
+ iops->fini(env, it);
out:
- return rc;
+ return rc;
}
/**
return inode;
}
-static int osd_fid_lookup(const struct lu_env *env,
- struct osd_object *obj, const struct lu_fid *fid)
+static struct inode *
+osd_iget_verify(struct osd_thread_info *info, struct osd_device *dev,
+ struct osd_inode_id *id, const struct lu_fid *fid)
{
- struct osd_thread_info *info;
- struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
- struct osd_device *dev;
- struct osd_inode_id *id;
- struct inode *inode;
- int result;
+ struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
+ struct inode *inode;
+ int rc;
- LINVRNT(osd_invariant(obj));
- LASSERT(obj->oo_inode == NULL);
- LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
- /*
- * This assertion checks that osd layer sees only local
- * fids. Unfortunately it is somewhat expensive (does a
- * cache-lookup). Disabling it for production/acceptance-testing.
- */
- LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
+ inode = osd_iget(info, dev, id);
+ if (IS_ERR(inode))
+ return inode;
- ENTRY;
+ rc = osd_get_lma(inode, &info->oti_obj_dentry, lma);
+ if (rc != 0) {
+ if (rc == -ENODATA) {
+ CDEBUG(D_LFSCK, "inconsistent obj: NULL, %lu, "DFID"\n",
+ inode->i_ino, PFID(fid));
+ rc = -EREMCHG;
+ }
+ iput(inode);
+ return ERR_PTR(rc);
+ }
+
+ if (!lu_fid_eq(fid, &lma->lma_self_fid)) {
+ CDEBUG(D_LFSCK, "inconsistent obj: "DFID", %lu, "DFID"\n",
+ PFID(&lma->lma_self_fid), inode->i_ino, PFID(fid));
+ iput(inode);
+ return ERR_PTR(EREMCHG);
+ }
+ return inode;
+}
- info = osd_oti_get(env);
- LASSERT(info);
- dev = osd_dev(ldev);
- id = &info->oti_id;
+static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
+ const struct lu_fid *fid,
+ const struct lu_object_conf *conf)
+{
+ struct osd_thread_info *info;
+ struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
+ struct osd_device *dev;
+ struct osd_idmap_cache *oic;
+ struct osd_inode_id *id;
+ struct inode *inode;
+ struct osd_scrub *scrub;
+ struct scrub_file *sf;
+ int result;
+ int verify = 0;
+ ENTRY;
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
- RETURN(-ENOENT);
+ LINVRNT(osd_invariant(obj));
+ LASSERT(obj->oo_inode == NULL);
+ LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
+
+ dev = osd_dev(ldev);
+ scrub = &dev->od_scrub;
+ sf = &scrub->os_file;
+ info = osd_oti_get(env);
+ LASSERT(info);
+ oic = &info->oti_cache;
+ id = &oic->oic_lid;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
+ RETURN(-ENOENT);
+
+ if (fid_is_norm(fid)) {
+ /* Search order: 1. per-thread cache. */
+ if (lu_fid_eq(fid, &oic->oic_fid)) {
+ goto iget;
+ } else if (!cfs_list_empty(&scrub->os_inconsistent_items)) {
+ /* Search order: 2. OI scrub pending list. */
+ result = osd_oii_lookup(dev, fid, id);
+ if (result == 0)
+ goto iget;
+ }
- result = osd_oi_lookup(info, dev, fid, id);
- if (result != 0) {
- if (result == -ENOENT)
- result = 0;
- GOTO(out, result);
- }
+ if (sf->sf_flags & SF_INCONSISTENT)
+ verify = 1;
+ }
+
+ fid_zero(&oic->oic_fid);
+ /* Search order: 3. OI files. */
+ result = osd_oi_lookup(info, dev, fid, id);
+ if (result != 0 && result != -ENOENT)
+ GOTO(out, result);
+
+ /* If fid wasn't found in oi, inode-less object is created,
+ * for which lu_object_exists() returns false. This is used
+ * in a (frequent) case when objects are created as locking
+ * anchors or place holders for objects yet to be created. */
+ if (conf != NULL && conf->loc_flags & LOC_F_NEW) {
+ if (unlikely(result == 0))
+ GOTO(out, result = -EEXIST);
+ else
+ GOTO(out, result = 0);
+ }
+
+ if (result == -ENOENT) {
+ if (!fid_is_norm(fid) ||
+ !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid),
+ sf->sf_oi_bitmap))
+ GOTO(out, result = 0);
+
+ goto trigger;
+ }
+
+iget:
+ if (verify == 0)
+ inode = osd_iget(info, dev, id);
+ else
+ inode = osd_iget_verify(info, dev, id, fid);
+ if (IS_ERR(inode)) {
+ result = PTR_ERR(inode);
+ if (result == -ENOENT || result == -ESTALE) {
+ result = 0;
+ } else if (result == -EREMCHG) {
+
+trigger:
+ if (thread_is_running(&scrub->os_thread)) {
+ result = -EINPROGRESS;
+ } else if (!scrub->os_no_scrub) {
+ result = osd_scrub_start(dev);
+ LCONSOLE_ERROR("Trigger OI scrub by RPC for "
+ DFID", rc = %d\n",
+ PFID(fid), result);
+ if (result == 0 || result == -EALREADY)
+ result = -EINPROGRESS;
+ else
+ result = -EREMCHG;
+ }
+ }
- inode = osd_iget(info, dev, id);
- if (IS_ERR(inode)) {
- /*
- * If fid wasn't found in oi, inode-less object is
- * created, for which lu_object_exists() returns
- * false. This is used in a (frequent) case when
- * objects are created as locking anchors or
- * place holders for objects yet to be created.
- */
- result = PTR_ERR(inode);
GOTO(out, result);
}
* life-cycle.
*/
static int osd_object_init(const struct lu_env *env, struct lu_object *l,
- const struct lu_object_conf *unused)
+ const struct lu_object_conf *conf)
{
- struct osd_object *obj = osd_obj(l);
- int result;
+ struct osd_object *obj = osd_obj(l);
+ int result;
- LINVRNT(osd_invariant(obj));
+ LINVRNT(osd_invariant(obj));
- result = osd_fid_lookup(env, obj, lu_object_fid(l));
+ result = osd_fid_lookup(env, obj, lu_object_fid(l), conf);
obj->oo_dt.do_body_ops = &osd_body_ops_new;
if (result == 0) {
if (obj->oo_inode != NULL)
* \retval 0 on success
*/
static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
- __u32 ino, struct lu_fid *fid)
+ __u32 ino, struct lu_fid *fid,
+ struct osd_inode_id *id)
{
- struct osd_thread_info *info = osd_oti_get(env);
- struct osd_inode_id *id = &info->oti_id;
- struct inode *inode;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct inode *inode;
ENTRY;
osd_id_gen(id, ino, OSD_OII_NOGEN);
return rc;
}
+static int
+osd_consistency_check(struct osd_thread_info *oti, struct osd_device *dev,
+ struct osd_idmap_cache *oic)
+{
+ struct osd_scrub *scrub = &dev->od_scrub;
+ struct lu_fid *fid = &oic->oic_fid;
+ struct osd_inode_id *id = &oti->oti_id;
+ int once = 0;
+ int rc;
+ ENTRY;
+
+again:
+ rc = osd_oi_lookup(oti, dev, fid, id);
+ if (rc != 0 && rc != -ENOENT)
+ RETURN(rc);
+
+ if (rc == 0 && osd_id_eq(id, &oic->oic_lid))
+ RETURN(0);
+
+ if (thread_is_running(&scrub->os_thread)) {
+ rc = osd_oii_insert(dev, oic, rc == -ENOENT);
+ /* There is race condition between osd_oi_lookup and OI scrub.
+ * The OI scrub finished just after osd_oi_lookup() failure.
+ * Under such case, it is unnecessary to trigger OI scrub again,
+ * but try to call osd_oi_lookup() again. */
+ if (unlikely(rc == -EAGAIN))
+ goto again;
+
+ RETURN(rc);
+ }
+
+ if (!scrub->os_no_scrub && ++once == 1) {
+ CDEBUG(D_LFSCK, "Trigger OI scrub by RPC for "DFID"\n",
+ PFID(fid));
+ rc = osd_scrub_start(dev);
+ CDEBUG(D_LFSCK, "Trigger OI scrub by RPC for "DFID", rc = %d\n",
+ PFID(fid), rc);
+ if (rc == 0)
+ goto again;
+ }
+
+ RETURN(rc = -EREMCHG);
+}
+
/**
* Calls ->lookup() to find dentry. From dentry get inode and
* read inode's ea to get fid. This is required for interoperability
bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
if (bh) {
- ino = le32_to_cpu(de->inode);
- rc = osd_get_fid_from_dentry(de, rec);
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct osd_idmap_cache *oic = &oti->oti_cache;
+ struct osd_device *dev = osd_obj2dev(obj);
+ struct osd_scrub *scrub = &dev->od_scrub;
+ struct scrub_file *sf = &scrub->os_file;
+
+ ino = le32_to_cpu(de->inode);
+ rc = osd_get_fid_from_dentry(de, rec);
+
+ /* done with de, release bh */
+ brelse(bh);
+ if (rc != 0)
+ rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
+ else
+ osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
+
+ if (rc != 0 || !fid_is_norm(fid))
+ GOTO(out, rc);
+
+ oic->oic_fid = *fid;
+ if ((scrub->os_pos_current <= ino) &&
+ (sf->sf_flags & SF_INCONSISTENT ||
+ ldiskfs_test_bit(osd_oi_fid2idx(dev, fid),
+ sf->sf_oi_bitmap)))
+ rc = osd_consistency_check(oti, dev, oic);
+ } else {
+ rc = -ENOENT;
+ }
- /* done with de, release bh */
- brelse(bh);
- if (rc != 0)
- rc = osd_ea_fid_get(env, obj, ino, fid);
- } else {
- rc = -ENOENT;
- }
+ GOTO(out, rc);
- if (hlock != NULL)
- ldiskfs_htree_unlock(hlock);
- else
- cfs_up_read(&obj->oo_ext_idx_sem);
- RETURN (rc);
+out:
+ if (hlock != NULL)
+ ldiskfs_htree_unlock(hlock);
+ else
+ cfs_up_read(&obj->oo_ext_idx_sem);
+ return rc;
}
/**
* \retval -ve on error
*/
static inline int osd_it_ea_rec(const struct lu_env *env,
- const struct dt_it *di,
- struct dt_rec *dtrec, __u32 attr)
-{
- struct osd_it_ea *it = (struct osd_it_ea *)di;
- struct osd_object *obj = it->oie_obj;
- struct lu_fid *fid = &it->oie_dirent->oied_fid;
- struct lu_dirent *lde = (struct lu_dirent *)dtrec;
- int rc = 0;
+ const struct dt_it *di,
+ struct dt_rec *dtrec, __u32 attr)
+{
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ struct osd_object *obj = it->oie_obj;
+ struct osd_device *dev = osd_obj2dev(obj);
+ struct osd_scrub *scrub = &dev->od_scrub;
+ struct scrub_file *sf = &scrub->os_file;
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct osd_idmap_cache *oic = &oti->oti_cache;
+ struct lu_fid *fid = &it->oie_dirent->oied_fid;
+ struct lu_dirent *lde = (struct lu_dirent *)dtrec;
+ __u32 ino = it->oie_dirent->oied_ino;
+ int rc = 0;
+ ENTRY;
- ENTRY;
+ if (!fid_is_sane(fid)) {
+ rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
+ if (rc != 0)
+ RETURN(rc);
+ } else {
+ osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
+ }
- if (!fid_is_sane(fid))
- rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
+ osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
+ it->oie_dirent->oied_name,
+ it->oie_dirent->oied_namelen,
+ it->oie_dirent->oied_type, attr);
- if (rc == 0)
- osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
- it->oie_dirent->oied_name,
- it->oie_dirent->oied_namelen,
- it->oie_dirent->oied_type,
- attr);
- RETURN(rc);
+ if (!fid_is_norm(fid))
+ RETURN(0);
+
+ oic->oic_fid = *fid;
+ if ((scrub->os_pos_current <= ino) &&
+ (sf->sf_flags & SF_INCONSISTENT ||
+ ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), sf->sf_oi_bitmap)))
+ rc = osd_consistency_check(oti, dev, oic);
+
+ RETURN(rc);
}
/**