From 1196597f8464e56d2d42845166e8aafa9792172e Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Tue, 23 Feb 2016 11:33:55 +0800 Subject: [PATCH] LU-5070 osd-zfs: get object's FID from its LMA EA Do not assume that the FID-in-dirent is always valid. If the FID-in-dirent is absent or invalid, then the osd_dir_lookup needs to find out the FID from the target object's LMA EA. It is the similar logic as osd-ldiskfs does. Signed-off-by: Fan Yong Change-Id: If75ff4ed03bd75a3385666842e94838ece3f4344 Reviewed-on: http://review.whamcloud.com/18595 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Niu Yawei Reviewed-by: Oleg Drokin --- lustre/osd-zfs/osd_index.c | 169 +++++++++++++++++++++++++++----------------- lustre/osd-zfs/osd_object.c | 5 +- 2 files changed, 106 insertions(+), 68 deletions(-) diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index 705c86d..8b6c8fe 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -228,90 +228,121 @@ static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr, ent->lde_attrs = cpu_to_le32(ent->lde_attrs); } -/* - * as we don't know FID, we can't use LU object, so this function - * partially duplicate __osd_xattr_get() which is built around - * LU-object and uses it to cache data like regular EA dnode, etc +/** + * Get the object's FID from its LMA EA. + * + * \param[in] env pointer to the thread context + * \param[in] osd pointer to the OSD device + * \param[in] oid the object's local identifier + * \param[out] fid the buffer to hold the object's FID + * + * \retval 0 for success + * \retval negative error number on failure */ -static int osd_find_parent_by_dnode(const struct lu_env *env, - struct dt_object *o, - struct lu_fid *fid) +static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd, + uint64_t oid, struct lu_fid *fid) { - struct osd_device *osd = osd_obj2dev(osd_dt_obj(o)); - struct lustre_mdt_attrs *lma; + struct objset *os = osd->od_os; + struct osd_thread_info *oti = osd_oti_get(env); + struct lustre_mdt_attrs *lma = + (struct lustre_mdt_attrs *)oti->oti_buf; struct lu_buf buf; - sa_handle_t *sa_hdl; - nvlist_t *nvbuf = NULL; - uchar_t *value; - uint64_t dnode; - int rc, size; + nvlist_t *sa_xattr = NULL; + sa_handle_t *sa_hdl = NULL; + uchar_t *nv_value = NULL; + uint64_t xattr = ZFS_NO_OBJECT; + int size = 0; + int rc; ENTRY; - /* first of all, get parent dnode from own attributes */ - LASSERT(osd_dt_obj(o)->oo_db); - rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object, - NULL, SA_HDL_PRIVATE, &sa_hdl); - if (rc) - RETURN(rc); + rc = __osd_xattr_load(osd, oid, &sa_xattr); + if (rc == -ENOENT) + goto regular; - dnode = ZFS_NO_OBJECT; - rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8); - sa_handle_destroy(sa_hdl); - if (rc) - RETURN(rc); + if (rc != 0) + GOTO(out, rc); - /* now get EA buffer */ - rc = __osd_xattr_load(osd, dnode, &nvbuf); - if (rc) - GOTO(regular, rc); + rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value, + &size); + if (rc == -ENOENT) + goto regular; - /* XXX: if we get that far.. should we cache the result? */ + if (rc != 0) + GOTO(out, rc); - /* try to find LMA attribute */ - LASSERT(nvbuf != NULL); - rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size); - if (rc == 0 && size >= sizeof(*lma)) { - lma = (struct lustre_mdt_attrs *)value; - lustre_lma_swab(lma); - *fid = lma->lma_self_fid; - GOTO(out, rc = 0); - } + if (unlikely(size > sizeof(oti->oti_buf))) + GOTO(out, rc = -ERANGE); -regular: - /* no LMA attribute in SA, let's try regular EA */ + memcpy(lma, nv_value, size); - /* first of all, get parent dnode storing regular EA */ - rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl); - if (rc) + goto found; + +regular: + rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc != 0) GOTO(out, rc); - dnode = ZFS_NO_OBJECT; - rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8); + rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8); sa_handle_destroy(sa_hdl); - if (rc) + if (rc != 0) GOTO(out, rc); - CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf)); - buf.lb_buf = osd_oti_get(env)->oti_buf; - buf.lb_len = sizeof(osd_oti_get(env)->oti_buf); - - /* now try to find LMA */ - rc = __osd_xattr_get_large(env, osd, dnode, &buf, + buf.lb_buf = lma; + buf.lb_len = sizeof(oti->oti_buf); + rc = __osd_xattr_get_large(env, osd, xattr, &buf, XATTR_NAME_LMA, &size); - if (rc == 0 && size >= sizeof(*lma)) { - lma = buf.lb_buf; - lustre_lma_swab(lma); - *fid = lma->lma_self_fid; - GOTO(out, rc = 0); - } else if (rc < 0) { + if (rc != 0) GOTO(out, rc); - } else { + +found: + if (size < sizeof(*lma)) GOTO(out, rc = -EIO); + + lustre_lma_swab(lma); + if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) || + CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) { + CWARN("%s: unsupported incompat LMA feature(s) %#x for " + "oid = "LPX64"\n", osd->od_svname, + lma->lma_incompat & ~LMA_INCOMPAT_SUPP, oid); + GOTO(out, rc = -EOPNOTSUPP); + } else { + *fid = lma->lma_self_fid; + GOTO(out, rc = 0); } out: - if (nvbuf != NULL) - nvlist_free(nvbuf); + if (sa_xattr != NULL) + nvlist_free(sa_xattr); + return rc; +} + +/* + * As we don't know FID, we can't use LU object, so this function + * partially duplicate __osd_xattr_get() which is built around + * LU-object and uses it to cache data like regular EA dnode, etc + */ +static int osd_find_parent_by_dnode(const struct lu_env *env, + struct dt_object *o, + struct lu_fid *fid) +{ + struct osd_device *osd = osd_obj2dev(osd_dt_obj(o)); + sa_handle_t *sa_hdl; + uint64_t dnode = ZFS_NO_OBJECT; + int rc; + ENTRY; + + /* first of all, get parent dnode from own attributes */ + LASSERT(osd_dt_obj(o)->oo_db); + rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object, + NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc != 0) + RETURN(rc); + + rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8); + sa_handle_destroy(sa_hdl); + if (rc == 0) + rc = osd_get_fid_by_oid(env, osd, dnode, fid); + RETURN(rc); } @@ -407,12 +438,22 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt, } } + memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid)); rc = -zap_lookup(osd->od_os, obj->oo_db->db_object, (char *)key, 8, sizeof(oti->oti_zde) / 8, (void *)&oti->oti_zde); - memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid)); + if (rc != 0) + RETURN(rc); - RETURN(rc == 0 ? 1 : rc); + if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) { + memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid)); + RETURN(1); + } + + rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode, + (struct lu_fid *)rec); + + RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc)); } static int osd_declare_dir_insert(const struct lu_env *env, diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index 5c63388..b875be2 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -1577,12 +1577,9 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_invariant(obj)); rc = osd_init_lma(env, obj, fid, oh); - if (rc) { + if (rc != 0) CERROR("%s: can not set LMA on "DFID": rc = %d\n", osd->od_svname, PFID(fid), rc); - /* ignore errors during LMA initialization */ - rc = 0; - } out: up_write(&obj->oo_guard); -- 1.8.3.1