From ed7024b4b92229b80cd2061c71c6173a70fba572 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Thu, 7 Mar 2013 15:09:52 +0400 Subject: [PATCH] LU-2449 osd: lookup(..) to fetch fid from parent's LMA in case LinkEA is not accessible for a reason, osd-zfs will be trying to get fid for ".." using parent's dnode which is stored in regular ZFS attributes. parent's LMA can be used to get fid for ".." Signed-off-by: Alex Zhuravlev Change-Id: I34e9f884eb60f036c2f941013bf22e154efc2ff4 Reviewed-on: http://review.whamcloud.com/5629 Reviewed-by: Andreas Dilger Tested-by: Hudson Tested-by: Maloo Reviewed-by: Mike Pershin --- lustre/osd-zfs/osd_index.c | 111 ++++++++++++++++++++++++++++++++++++++++-- lustre/osd-zfs/osd_internal.h | 4 ++ lustre/osd-zfs/osd_xattr.c | 79 ++++++++++++++++++------------ 3 files changed, 160 insertions(+), 34 deletions(-) diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index 92a2723..f597e02 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -197,6 +197,93 @@ static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr, ent->lde_attrs = cpu_to_le32(ent->lde_attrs); } +/* + * as we don't know FID, we can't use LU object, so this function + * partially duplicate __osd_xattr_get() which is built around + * LU-object and uses it to cache data like regular EA dnode, etc + */ +static int osd_find_parent_by_dnode(const struct lu_env *env, + struct dt_object *o, + struct lu_fid *fid) +{ + struct lustre_mdt_attrs *lma; + udmu_objset_t *uos = &osd_obj2dev(osd_dt_obj(o))->od_objset; + struct lu_buf buf; + sa_handle_t *sa_hdl; + nvlist_t *nvbuf = NULL; + uchar_t *value; + uint64_t dnode; + int rc, size; + ENTRY; + + /* first of all, get parent dnode from own attributes */ + LASSERT(osd_dt_obj(o)->oo_db); + rc = -sa_handle_get(uos->os, osd_dt_obj(o)->oo_db->db_object, + NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc) + RETURN(rc); + + dnode = ZFS_NO_OBJECT; + rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(uos), &dnode, 8); + sa_handle_destroy(sa_hdl); + if (rc) + RETURN(rc); + + /* now get EA buffer */ + rc = __osd_xattr_load(uos, dnode, &nvbuf); + if (rc) + GOTO(regular, rc); + + /* XXX: if we get that far.. should we cache the result? */ + + /* try to find LMA attribute */ + LASSERT(nvbuf != NULL); + rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size); + if (rc == 0 && size >= sizeof(*lma)) { + lma = (struct lustre_mdt_attrs *)value; + lustre_lma_swab(lma); + *fid = lma->lma_self_fid; + GOTO(out, rc = 0); + } + +regular: + /* no LMA attribute in SA, let's try regular EA */ + + /* first of all, get parent dnode storing regular EA */ + rc = -sa_handle_get(uos->os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc) + GOTO(out, rc); + + dnode = ZFS_NO_OBJECT; + rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(uos), &dnode, 8); + sa_handle_destroy(sa_hdl); + if (rc) + GOTO(out, rc); + + CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf)); + buf.lb_buf = osd_oti_get(env)->oti_buf; + buf.lb_len = sizeof(osd_oti_get(env)->oti_buf); + + /* now try to find LMA */ + rc = __osd_xattr_get_large(env, uos, dnode, &buf, + XATTR_NAME_LMA, &size); + if (rc == 0 && size >= sizeof(*lma)) { + lma = buf.lb_buf; + lustre_lma_swab(lma); + *fid = lma->lma_self_fid; + GOTO(out, rc = 0); + } else if (rc < 0) { + GOTO(out, rc); + } else { + GOTO(out, rc = -EIO); + } + +out: + if (nvbuf != NULL) + nvlist_free(nvbuf); + RETURN(rc); +} + static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o, struct lu_fid *fid) { @@ -245,6 +332,25 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o, out: if (buf.lb_buf != osd_oti_get(env)->oti_buf) OBD_FREE(buf.lb_buf, buf.lb_len); + +#if 0 + /* this block can be enabled for additional verification + * it's trying to match FID from LinkEA vs. FID from LMA */ + if (rc == 0) { + struct lu_fid fid2; + int rc2; + rc2 = osd_find_parent_by_dnode(env, o, &fid2); + if (rc2 == 0) + if (lu_fid_eq(fid, &fid2) == 0) + CERROR("wrong parent: "DFID" != "DFID"\n", + PFID(fid), PFID(&fid2)); + } +#endif + + /* no LinkEA is found, let's try to find the fid in parent's LMA */ + if (unlikely(rc != 0)) + rc = osd_find_parent_by_dnode(env, o, fid); + RETURN(rc); } @@ -431,9 +537,8 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, /* update parent dnode in the child. * later it will be used to generate ".." */ udmu_objset_t *uos = &osd->od_objset; - rc = osd_object_sa_update(child, - SA_ZPL_PARENT(uos), - &parent->oo_db->db_object, + rc = osd_object_sa_update(parent, SA_ZPL_PARENT(uos), + &child->oo_db->db_object, 8, oh); #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index 6ea83ba..55759c3 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -428,6 +428,10 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt, /* osd_xattr.c */ +int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr); +int __osd_xattr_get_large(const struct lu_env *env, udmu_objset_t *uos, + uint64_t xattr, struct lu_buf *buf, + const char *name, int *sizep); int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, const char *name, struct lustre_capa *capa); diff --git a/lustre/osd-zfs/osd_xattr.c b/lustre/osd-zfs/osd_xattr.c index 56d1352..463ea28 100644 --- a/lustre/osd-zfs/osd_xattr.c +++ b/lustre/osd-zfs/osd_xattr.c @@ -84,28 +84,23 @@ * * No locking is done here. */ -int __osd_xattr_cache(const struct lu_env *env, struct osd_object *obj) +int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr) { - struct osd_device *osd = osd_obj2dev(obj); - udmu_objset_t *uos = &osd->od_objset; - sa_handle_t *sa_hdl; - char *buf; - int size; - int rc; + sa_handle_t *sa_hdl; + char *buf; + int rc, size; - LASSERT(obj->oo_sa_xattr == NULL); - LASSERT(obj->oo_db != NULL); + if (unlikely(dnode == ZFS_NO_OBJECT)) + return -ENOENT; - rc = -sa_handle_get(uos->os, obj->oo_db->db_object, NULL, - SA_HDL_PRIVATE, &sa_hdl); + rc = -sa_handle_get(uos->os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl); if (rc) return rc; rc = -sa_size(sa_hdl, SA_ZPL_DXATTR(uos), &size); if (rc) { if (rc == -ENOENT) - rc = -nvlist_alloc(&obj->oo_sa_xattr, - NV_UNIQUE_NAME, KM_SLEEP); + rc = -nvlist_alloc(sa_xattr, NV_UNIQUE_NAME, KM_SLEEP); goto out_sa; } @@ -116,7 +111,7 @@ int __osd_xattr_cache(const struct lu_env *env, struct osd_object *obj) } rc = -sa_lookup(sa_hdl, SA_ZPL_DXATTR(uos), buf, size); if (rc == 0) - rc = -nvlist_unpack(buf, size, &obj->oo_sa_xattr, KM_SLEEP); + rc = -nvlist_unpack(buf, size, sa_xattr, KM_SLEEP); sa_spill_free(buf); out_sa: sa_handle_destroy(sa_hdl); @@ -124,6 +119,16 @@ out_sa: return rc; } +static inline int __osd_xattr_cache(const struct lu_env *env, + struct osd_object *obj) +{ + LASSERT(obj->oo_sa_xattr == NULL); + LASSERT(obj->oo_db != NULL); + + return __osd_xattr_load(&osd_obj2dev(obj)->od_objset, + obj->oo_db->db_object, &obj->oo_sa_xattr); +} + int __osd_sa_xattr_get(const struct lu_env *env, struct osd_object *obj, const struct lu_buf *buf, const char *name, int *sizep) { @@ -156,28 +161,21 @@ int __osd_sa_xattr_get(const struct lu_env *env, struct osd_object *obj, return 0; } -int __osd_xattr_get(const struct lu_env *env, struct osd_object *obj, - struct lu_buf *buf, const char *name, int *sizep) +int __osd_xattr_get_large(const struct lu_env *env, udmu_objset_t *uos, + uint64_t xattr, struct lu_buf *buf, + const char *name, int *sizep) { - struct osd_device *osd = osd_obj2dev(obj); - udmu_objset_t *uos = &osd->od_objset; - uint64_t xa_data_obj; - dmu_buf_t *xa_data_db; - sa_handle_t *sa_hdl = NULL; - uint64_t size; - int rc; - - /* check SA_ZPL_DXATTR first then fallback to directory xattr */ - rc = __osd_sa_xattr_get(env, obj, buf, name, sizep); - if (rc != -ENOENT) - return rc; + dmu_buf_t *xa_data_db; + sa_handle_t *sa_hdl = NULL; + uint64_t xa_data_obj, size; + int rc; /* are there any extended attributes? */ - if (obj->oo_xattr == ZFS_NO_OBJECT) + if (xattr == ZFS_NO_OBJECT) return -ENOENT; /* Lookup the object number containing the xattr data */ - rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1, + rc = -zap_lookup(uos->os, xattr, name, sizeof(uint64_t), 1, &xa_data_obj); if (rc) return rc; @@ -219,6 +217,23 @@ out: sa_handle_destroy(sa_hdl); out_rele: dmu_buf_rele(xa_data_db, FTAG); + + return rc; +} + +int __osd_xattr_get(const struct lu_env *env, struct osd_object *obj, + struct lu_buf *buf, const char *name, int *sizep) +{ + int rc; + + /* check SA_ZPL_DXATTR first then fallback to directory xattr */ + rc = __osd_sa_xattr_get(env, obj, buf, name, sizep); + if (rc != -ENOENT) + return rc; + + rc = __osd_xattr_get_large(env, &osd_obj2dev(obj)->od_objset, + obj->oo_xattr, buf, name, sizep); + return rc; } @@ -790,7 +805,9 @@ int osd_xattr_list(const struct lu_env *env, struct dt_object *dt, zap_cursor_advance(zc); } - if (rc < 0) + if (rc == -ENOENT) /* no more kes in the index */ + rc = 0; + else if (unlikely(rc < 0)) GOTO(out_fini, rc); rc = counted; -- 1.8.3.1