*/
static int osd_find_parent_by_dnode(const struct lu_env *env,
struct dt_object *o,
- struct lu_fid *fid)
+ struct lu_fid *fid, uint64_t *oid)
{
struct osd_object *obj = osd_dt_obj(o);
struct osd_device *osd = osd_obj2dev(obj);
if (rc != 0)
RETURN(rc);
rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
- if (rc == 0)
+ if (!rc) {
+ if (oid)
+ *oid = dnode;
rc = osd_get_fid_by_oid(env, osd, dnode, fid);
+ }
RETURN(rc);
}
static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
- struct lu_fid *fid)
+ struct lu_fid *fid, uint64_t *oid)
{
struct link_ea_header *leh;
struct link_ea_entry *lee;
if (rc == 0) {
struct lu_fid fid2;
int rc2;
- rc2 = osd_find_parent_by_dnode(env, o, &fid2);
+ rc2 = osd_find_parent_by_dnode(env, o, &fid2, oid);
if (rc2 == 0)
if (lu_fid_eq(fid, &fid2) == 0)
CERROR("wrong parent: "DFID" != "DFID"\n",
/* no LinkEA is found, let's try to find the fid in parent's LMA */
if (unlikely(rc != 0))
- rc = osd_find_parent_by_dnode(env, o, fid);
+ rc = osd_find_parent_by_dnode(env, o, fid, oid);
RETURN(rc);
}
+/*
+ * When lookup item under striped directory, we need to locate the master
+ * MDT-object of the striped directory firstly, then the client will send
+ * lookup (getattr_by_name) RPC to the MDT with some slave MDT-object's FID
+ * and the item's name. If the system is restored from MDT file level backup,
+ * then before the OI scrub completely built the OI files, the OI mappings of
+ * the master MDT-object and slave MDT-object may be invalid. Usually, it is
+ * not a problem for the master MDT-object. Because when locate the master
+ * MDT-object, we will do name based lookup (for the striped directory itself)
+ * firstly, during such process we can setup the correct OI mapping for the
+ * master MDT-object. But it will be trouble for the slave MDT-object. Because
+ * the client will not trigger name based lookup on the MDT to locate the slave
+ * MDT-object before locating item under the striped directory, then when
+ * osd_fid_lookup(), it will find that the OI mapping for the slave MDT-object
+ * is invalid and does not know what the right OI mapping is, then the MDT has
+ * to return -EINPROGRESS to the client to notify that the OI scrub is rebuiding
+ * the OI file, related OI mapping is unknown yet, please try again later. And
+ * then client will re-try the RPC again and again until related OI mapping has
+ * been updated. That is quite inefficient.
+ *
+ * To resolve above trouble, we will handle it as the following two cases:
+ *
+ * 1) The slave MDT-object and the master MDT-object are on different MDTs.
+ * It is relative easy. Be as one of remote MDT-objects, the slave MDT-object
+ * is linked under /REMOTE_PARENT_DIR with the name of its FID string.
+ * We can locate the slave MDT-object via lookup the /REMOTE_PARENT_DIR
+ * directly. Please check osd_fid_lookup().
+ *
+ * 2) The slave MDT-object and the master MDT-object reside on the same MDT.
+ * Under such case, during lookup the master MDT-object, we will lookup the
+ * slave MDT-object via readdir against the master MDT-object, because the
+ * slave MDT-objects information are stored as sub-directories with the name
+ * "${FID}:${index}". Then when find the local slave MDT-object, its OI
+ * mapping will be recorded. Then subsequent osd_fid_lookup() will know
+ * the correct OI mapping for the slave MDT-object.
+ */
+static int osd_check_lmv(const struct lu_env *env, struct osd_device *osd,
+ uint64_t oid, const struct lu_fid *fid)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct luz_direntry *zde = &info->oti_zde;
+ zap_attribute_t *za = &info->oti_za;
+ zap_cursor_t *zc = &info->oti_zc;
+ struct lu_fid *tfid = &info->oti_fid;
+ nvlist_t *nvbuf = NULL;
+ struct lmv_mds_md_v1 *lmv = NULL;
+ int size;
+ int rc;
+ ENTRY;
+
+ rc = __osd_xattr_load_by_oid(osd, oid, &nvbuf);
+ if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
+ RETURN(0);
+
+ if (rc)
+ RETURN(rc);
+
+ rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMV,
+ (uchar_t **)&lmv, &size);
+ if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
+ GOTO(out_nvbuf, rc = 0);
+
+ if (rc || le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+ GOTO(out_nvbuf, rc);
+
+ zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
+ rc = -zap_cursor_retrieve(zc, za);
+ if (rc == -ENOENT) {
+ zap_cursor_advance(zc);
+ } else if (rc) {
+ CERROR("%s: fail to init for check LMV "DFID"(%llu): rc = %d\n",
+ osd_name(osd), PFID(fid), oid, rc);
+ GOTO(out_zc, rc);
+ }
+
+ while (1) {
+ rc = -zap_cursor_retrieve(zc, za);
+ if (rc == -ENOENT)
+ GOTO(out_zc, rc = 0);
+
+ if (rc) {
+ CERROR("%s: fail to locate next for check LMV "
+ DFID"(%llu): rc = %d\n",
+ osd_name(osd), PFID(fid), oid, rc);
+ GOTO(out_zc, rc);
+ }
+
+ fid_zero(tfid);
+ sscanf(za->za_name + 1, SFID, RFID(tfid));
+ if (fid_is_sane(tfid) && !osd_remote_fid(env, osd, tfid)) {
+ rc = osd_zap_lookup(osd, oid, NULL, za->za_name,
+ za->za_integer_length,
+ sizeof(*zde) / za->za_integer_length,
+ (void *)zde);
+ if (rc) {
+ CERROR("%s: fail to lookup for check LMV "
+ DFID"(%llu): rc = %d\n",
+ osd_name(osd), PFID(fid), oid, rc);
+ GOTO(out_zc, rc);
+ }
+
+ rc = osd_oii_insert(env, osd, tfid,
+ zde->lzd_reg.zde_dnode, false);
+ GOTO(out_zc, rc);
+ }
+
+ zap_cursor_advance(zc);
+ }
+
+out_zc:
+ zap_cursor_fini(zc);
+out_nvbuf:
+ nvlist_free(nvbuf);
+
+ return rc;
+}
+
+static int
+osd_consistency_check(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, const struct lu_fid *fid,
+ uint64_t oid, bool is_dir)
+{
+ struct lustre_scrub *scrub = &osd->od_scrub;
+ dnode_t *dn = NULL;
+ uint64_t oid2;
+ int once = 0;
+ bool insert;
+ int rc;
+ ENTRY;
+
+ if (!fid_is_norm(fid) && !fid_is_igif(fid))
+ RETURN(0);
+
+ /* oid == ZFS_NO_OBJECT must be for lookup ".." case */
+ if (oid == ZFS_NO_OBJECT) {
+ rc = osd_sa_handle_get(obj);
+ if (rc)
+ RETURN(rc);
+
+ rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_PARENT(osd), &oid, 8);
+ if (rc)
+ RETURN(rc);
+ }
+
+ if (thread_is_running(&scrub->os_thread)) {
+ if (scrub->os_pos_current > oid)
+ RETURN(0);
+ } else if (osd->od_auto_scrub_interval == AS_NEVER) {
+ RETURN(0);
+ } else {
+ if (cfs_time_before(cfs_time_current_sec(),
+ scrub->os_file.sf_time_last_complete +
+ osd->od_auto_scrub_interval))
+ RETURN(0);
+ }
+
+again:
+ rc = osd_fid_lookup(env, osd, fid, &oid2);
+ if (rc == -ENOENT) {
+ insert = true;
+ if (dn)
+ goto trigger;
+
+ rc = __osd_obj2dnode(osd->od_os, oid, &dn);
+ /* The object has been removed (by race maybe). */
+ if (rc)
+ RETURN(rc = (rc == -EEXIST ? -ENOENT : rc));
+
+ goto trigger;
+ } else if (rc || oid == oid2) {
+ GOTO(out, rc);
+ }
+
+ insert = false;
+
+trigger:
+ if (thread_is_running(&scrub->os_thread)) {
+ if (!dn) {
+ rc = __osd_obj2dnode(osd->od_os, oid, &dn);
+ /* The object has been removed (by race maybe). */
+ if (rc)
+ RETURN(rc = (rc == -EEXIST ? -ENOENT : rc));
+ }
+
+ rc = osd_oii_insert(env, osd, fid, oid, insert);
+ /* There is race condition between osd_oi_lookup and OI scrub.
+ * The OI scrub finished just after osd_oi_lookup() failure.
+ * Under such case, it is unnecessary to trigger OI scrub again,
+ * but try to call osd_oi_lookup() again. */
+ if (unlikely(rc == -EAGAIN))
+ goto again;
+
+ if (is_dir)
+ rc = osd_check_lmv(env, osd, oid, fid);
+ else
+ rc = 0;
+
+ GOTO(out, rc);
+ }
+
+ if (osd->od_auto_scrub_interval != AS_NEVER && ++once == 1) {
+ rc = osd_scrub_start(env, osd, SS_AUTO_FULL |
+ SS_CLEAR_DRYRUN | SS_CLEAR_FAILOUT);
+ CDEBUG(D_LFSCK | D_CONSOLE | D_WARNING,
+ "%s: trigger partial OI scrub for RPC inconsistency "
+ "checking FID "DFID": rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ if (!rc)
+ goto again;
+ }
+
+ GOTO(out, rc);
+
+out:
+ if (dn)
+ osd_dnode_rele(dn);
+
+ return rc;
+}
+
static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
struct dt_rec *rec, const struct dt_key *key)
{
struct osd_thread_info *oti = osd_oti_get(env);
- struct osd_object *obj = osd_dt_obj(dt);
- struct osd_device *osd = osd_obj2dev(obj);
- char *name = (char *)key;
- int rc;
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
+ struct lu_fid *fid = (struct lu_fid *)rec;
+ char *name = (char *)key;
+ uint64_t oid = ZFS_NO_OBJECT;
+ int rc;
ENTRY;
if (name[0] == '.') {
memcpy(rec, f, sizeof(*f));
RETURN(1);
} else if (name[1] == '.' && name[2] == 0) {
- rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
- RETURN(rc == 0 ? 1 : rc);
+ rc = osd_find_parent_fid(env, dt, fid, &oid);
+ GOTO(out, rc);
}
}
if (rc != 0)
RETURN(rc);
+ oid = oti->oti_zde.lzd_reg.zde_dnode;
if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) {
memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
- RETURN(1);
+ GOTO(out, rc = 0);
}
- rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode,
- (struct lu_fid *)rec);
+ rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode, fid);
+
+ GOTO(out, rc);
+
+out:
+ if (!rc && !osd_remote_fid(env, osd, fid)) {
+ rc = osd_consistency_check(env, osd, obj, fid, oid,
+ S_ISDIR(DTTOIF(oti->oti_zde.lzd_reg.zde_type)));
+ /* Only -ENOENT error will affect the lookup result. */
+ if (rc != -ENOENT)
+ rc = 0;
+ }
- RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc));
+ return rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc);
}
/*
lde->lde_hash = cpu_to_le64(2);
strcpy(lde->lde_name, "..");
lde->lde_namelen = cpu_to_le16(2);
- rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, fid);
+ rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, fid, NULL);
if (!rc) {
fid_cpu_to_le(&lde->lde_fid, fid);
lde->lde_attrs = LUDA_FID;
}
};
-struct osd_metadnode_it {
- struct osd_device *mit_dev;
- __u64 mit_pos;
- struct lu_fid mit_fid;
- int mit_prefetched;
- __u64 mit_prefetched_dnode;
-};
-
-static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
- struct dt_object *dt, __u32 attr)
-{
- struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
- struct osd_metadnode_it *it;
- ENTRY;
-
- OBD_ALLOC_PTR(it);
- if (unlikely(it == NULL))
- RETURN(ERR_PTR(-ENOMEM));
-
- it->mit_dev = dev;
-
- /* XXX: dmu_object_next() does NOT find dnodes allocated
- * in the current non-committed txg, so we force txg
- * commit to find all existing dnodes ... */
- if (!dev->od_dt_dev.dd_rdonly)
- txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
-
- RETURN((struct dt_it *)it);
-}
-
-static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
-{
- struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
-
- OBD_FREE_PTR(it);
-}
-
-static int osd_zfs_otable_it_get(const struct lu_env *env,
- struct dt_it *di, const struct dt_key *key)
-{
- return 0;
-}
-
-static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
-{
-}
-
-#define OTABLE_PREFETCH 256
-
-static void osd_zfs_otable_prefetch(const struct lu_env *env,
- struct osd_metadnode_it *it)
-{
- struct osd_device *dev = it->mit_dev;
- int rc;
-
- /* can go negative on the very first access to the iterator
- * or if some non-Lustre objects were found */
- if (unlikely(it->mit_prefetched < 0))
- it->mit_prefetched = 0;
-
- if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
- return;
-
- if (it->mit_prefetched_dnode == 0)
- it->mit_prefetched_dnode = it->mit_pos;
-
- while (it->mit_prefetched < OTABLE_PREFETCH) {
- rc = -dmu_object_next(dev->od_os, &it->mit_prefetched_dnode,
- B_FALSE, 0);
- if (unlikely(rc != 0))
- break;
-
- osd_dmu_prefetch(dev->od_os, it->mit_prefetched_dnode,
- 0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
-
- it->mit_prefetched++;
- }
-}
-
-static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
-{
- struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
- struct lustre_mdt_attrs *lma;
- struct osd_device *dev = it->mit_dev;
- nvlist_t *nvbuf = NULL;
- uchar_t *v;
- __u64 dnode;
- int rc, s;
-
- memset(&it->mit_fid, 0, sizeof(it->mit_fid));
-
- dnode = it->mit_pos;
- do {
- rc = -dmu_object_next(dev->od_os, &it->mit_pos, B_FALSE, 0);
- if (unlikely(rc != 0))
- GOTO(out, rc = 1);
- it->mit_prefetched--;
-
- /* LMA is required for this to be a Lustre object.
- * If there is no xattr skip it. */
- rc = __osd_xattr_load_by_oid(dev, it->mit_pos, &nvbuf);
- if (unlikely(rc != 0))
- continue;
-
- LASSERT(nvbuf != NULL);
- rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
- if (likely(rc == 0)) {
- /* Lustre object */
- lma = (struct lustre_mdt_attrs *)v;
- lustre_lma_swab(lma);
- if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
- !(lma->lma_incompat & LMAI_AGENT))) {
- it->mit_fid = lma->lma_self_fid;
- nvlist_free(nvbuf);
- break;
- }
- }
-
- /* not a Lustre visible object, try next one */
- nvlist_free(nvbuf);
- } while (1);
-
-
- /* we aren't prefetching in the above loop because the number of
- * non-Lustre objects is very small and we will be repeating very
- * rare. in case we want to use this to iterate over non-Lustre
- * objects (i.e. when we convert regular ZFS in Lustre) it makes
- * sense to initiate prefetching in the loop */
-
- /* 0 - there are more items, +1 - the end */
- if (likely(rc == 0))
- osd_zfs_otable_prefetch(env, it);
-
- CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
- it->mit_pos, PFID(&it->mit_fid), rc);
-
-out:
- return rc;
-}
-
-static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
- const struct dt_it *di)
-{
- return NULL;
-}
-
-static int osd_zfs_otable_it_key_size(const struct lu_env *env,
- const struct dt_it *di)
-{
- return sizeof(__u64);
-}
-
-static int osd_zfs_otable_it_rec(const struct lu_env *env,
- const struct dt_it *di,
- struct dt_rec *rec, __u32 attr)
-{
- struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
- struct lu_fid *fid = (struct lu_fid *)rec;
- ENTRY;
-
- *fid = it->mit_fid;
-
- RETURN(0);
-}
-
-
-static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
- const struct dt_it *di)
-{
- struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
-
- return it->mit_pos;
-}
-
-static int osd_zfs_otable_it_load(const struct lu_env *env,
- const struct dt_it *di, __u64 hash)
-{
- struct osd_metadnode_it *it = (struct osd_metadnode_it *)di;
-
- it->mit_pos = hash;
- it->mit_prefetched = 0;
- it->mit_prefetched_dnode = 0;
-
- return osd_zfs_otable_it_next(env, (struct dt_it *)di);
-}
-
-static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
- const struct dt_it *di, void *key_rec)
-{
- return 0;
-}
-
-const struct dt_index_operations osd_zfs_otable_ops = {
- .dio_it = {
- .init = osd_zfs_otable_it_init,
- .fini = osd_zfs_otable_it_fini,
- .get = osd_zfs_otable_it_get,
- .put = osd_zfs_otable_it_put,
- .next = osd_zfs_otable_it_next,
- .key = osd_zfs_otable_it_key,
- .key_size = osd_zfs_otable_it_key_size,
- .rec = osd_zfs_otable_it_rec,
- .store = osd_zfs_otable_it_store,
- .load = osd_zfs_otable_it_load,
- .key_rec = osd_zfs_otable_it_key_rec,
- }
-};
-
int osd_index_try(const struct lu_env *env, struct dt_object *dt,
const struct dt_index_features *feat)
{
GOTO(out, rc = -ERANGE);
if (unlikely(feat == &dt_otable_features)) {
- dt->do_index_ops = &osd_zfs_otable_ops;
+ dt->do_index_ops = &osd_otable_ops;
GOTO(out, rc = 0);
}