X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd-zfs%2Fosd_index.c;h=8c1ffbf02a5737893527b630a24012ff7595d22a;hp=8f5905e9887243dafcf1b756fb59f1f5de5434f7;hb=e3d507eec50fc1ff79acf2a9f93d52d698c887d7;hpb=0c69c941cdae8cc41a3daaa9184ef2468a24aa09 diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index 8f5905e..8c1ffbf 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2014, Intel Corporation. + * Copyright (c) 2012, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -151,8 +151,7 @@ static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o, static struct dt_it *osd_index_it_init(const struct lu_env *env, struct dt_object *dt, - __u32 unused, - struct lustre_capa *capa) + __u32 unused) { struct osd_thread_info *info = osd_oti_get(env); struct osd_zap_it *it; @@ -161,7 +160,8 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env, int rc; ENTRY; - /* XXX: check capa ? */ + if (obj->oo_destroyed) + RETURN(ERR_PTR(-ENOENT)); LASSERT(lu_object_exists(lo)); LASSERT(obj->oo_db); @@ -179,7 +179,6 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env, } it->ozi_obj = obj; - it->ozi_capa = capa; it->ozi_reset = 1; lu_object_get(lo); @@ -229,90 +228,121 @@ static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr, ent->lde_attrs = cpu_to_le32(ent->lde_attrs); } -/* - * as we don't know FID, we can't use LU object, so this function - * partially duplicate __osd_xattr_get() which is built around - * LU-object and uses it to cache data like regular EA dnode, etc +/** + * Get the object's FID from its LMA EA. + * + * \param[in] env pointer to the thread context + * \param[in] osd pointer to the OSD device + * \param[in] oid the object's local identifier + * \param[out] fid the buffer to hold the object's FID + * + * \retval 0 for success + * \retval negative error number on failure */ -static int osd_find_parent_by_dnode(const struct lu_env *env, - struct dt_object *o, - struct lu_fid *fid) +static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd, + uint64_t oid, struct lu_fid *fid) { - struct osd_device *osd = osd_obj2dev(osd_dt_obj(o)); - struct lustre_mdt_attrs *lma; + struct objset *os = osd->od_os; + struct osd_thread_info *oti = osd_oti_get(env); + struct lustre_mdt_attrs *lma = + (struct lustre_mdt_attrs *)oti->oti_buf; struct lu_buf buf; - sa_handle_t *sa_hdl; - nvlist_t *nvbuf = NULL; - uchar_t *value; - uint64_t dnode; - int rc, size; + nvlist_t *sa_xattr = NULL; + sa_handle_t *sa_hdl = NULL; + uchar_t *nv_value = NULL; + uint64_t xattr = ZFS_NO_OBJECT; + int size = 0; + int rc; ENTRY; - /* first of all, get parent dnode from own attributes */ - LASSERT(osd_dt_obj(o)->oo_db); - rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object, - NULL, SA_HDL_PRIVATE, &sa_hdl); - if (rc) - RETURN(rc); + rc = __osd_xattr_load(osd, oid, &sa_xattr); + if (rc == -ENOENT) + goto regular; - dnode = ZFS_NO_OBJECT; - rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8); - sa_handle_destroy(sa_hdl); - if (rc) - RETURN(rc); + if (rc != 0) + GOTO(out, rc); - /* now get EA buffer */ - rc = __osd_xattr_load(osd, dnode, &nvbuf); - if (rc) - GOTO(regular, rc); + rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value, + &size); + if (rc == -ENOENT) + goto regular; - /* XXX: if we get that far.. should we cache the result? */ + if (rc != 0) + GOTO(out, rc); - /* try to find LMA attribute */ - LASSERT(nvbuf != NULL); - rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size); - if (rc == 0 && size >= sizeof(*lma)) { - lma = (struct lustre_mdt_attrs *)value; - lustre_lma_swab(lma); - *fid = lma->lma_self_fid; - GOTO(out, rc = 0); - } + if (unlikely(size > sizeof(oti->oti_buf))) + GOTO(out, rc = -ERANGE); -regular: - /* no LMA attribute in SA, let's try regular EA */ + memcpy(lma, nv_value, size); - /* first of all, get parent dnode storing regular EA */ - rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl); - if (rc) + goto found; + +regular: + rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc != 0) GOTO(out, rc); - dnode = ZFS_NO_OBJECT; - rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8); + rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8); sa_handle_destroy(sa_hdl); - if (rc) + if (rc != 0) GOTO(out, rc); - CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf)); - buf.lb_buf = osd_oti_get(env)->oti_buf; - buf.lb_len = sizeof(osd_oti_get(env)->oti_buf); - - /* now try to find LMA */ - rc = __osd_xattr_get_large(env, osd, dnode, &buf, + buf.lb_buf = lma; + buf.lb_len = sizeof(oti->oti_buf); + rc = __osd_xattr_get_large(env, osd, xattr, &buf, XATTR_NAME_LMA, &size); - if (rc == 0 && size >= sizeof(*lma)) { - lma = buf.lb_buf; - lustre_lma_swab(lma); - *fid = lma->lma_self_fid; - GOTO(out, rc = 0); - } else if (rc < 0) { + if (rc != 0) GOTO(out, rc); - } else { + +found: + if (size < sizeof(*lma)) GOTO(out, rc = -EIO); + + lustre_lma_swab(lma); + if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) || + CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) { + CWARN("%s: unsupported incompat LMA feature(s) %#x for " + "oid = "LPX64"\n", osd->od_svname, + lma->lma_incompat & ~LMA_INCOMPAT_SUPP, oid); + GOTO(out, rc = -EOPNOTSUPP); + } else { + *fid = lma->lma_self_fid; + GOTO(out, rc = 0); } out: - if (nvbuf != NULL) - nvlist_free(nvbuf); + if (sa_xattr != NULL) + nvlist_free(sa_xattr); + return rc; +} + +/* + * As we don't know FID, we can't use LU object, so this function + * partially duplicate __osd_xattr_get() which is built around + * LU-object and uses it to cache data like regular EA dnode, etc + */ +static int osd_find_parent_by_dnode(const struct lu_env *env, + struct dt_object *o, + struct lu_fid *fid) +{ + struct osd_device *osd = osd_obj2dev(osd_dt_obj(o)); + sa_handle_t *sa_hdl; + uint64_t dnode = ZFS_NO_OBJECT; + int rc; + ENTRY; + + /* first of all, get parent dnode from own attributes */ + LASSERT(osd_dt_obj(o)->oo_db); + rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object, + NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc != 0) + RETURN(rc); + + rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8); + sa_handle_destroy(sa_hdl); + if (rc == 0) + rc = osd_get_fid_by_oid(env, osd, dnode, fid); + RETURN(rc); } @@ -328,10 +358,9 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o, buf.lb_buf = osd_oti_get(env)->oti_buf; buf.lb_len = sizeof(osd_oti_get(env)->oti_buf); - rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA); + rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK); if (rc == -ERANGE) { - rc = osd_xattr_get(env, o, &LU_BUF_NULL, - XATTR_NAME_LINK, BYPASS_CAPA); + rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK); if (rc < 0) RETURN(rc); LASSERT(rc > 0); @@ -339,7 +368,7 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o, if (buf.lb_buf == NULL) RETURN(-ENOMEM); buf.lb_len = rc; - rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA); + rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK); } if (rc < 0) GOTO(out, rc); @@ -387,8 +416,7 @@ out: } static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct osd_thread_info *oti = osd_oti_get(env); struct osd_object *obj = osd_dt_obj(dt); @@ -410,12 +438,22 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt, } } + memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid)); rc = -zap_lookup(osd->od_os, obj->oo_db->db_object, (char *)key, 8, sizeof(oti->oti_zde) / 8, (void *)&oti->oti_zde); - memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid)); + if (rc != 0) + RETURN(rc); - RETURN(rc == 0 ? 1 : rc); + if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) { + memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid)); + RETURN(1); + } + + rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode, + (struct lu_fid *)rec); + + RETURN(rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc)); } static int osd_declare_dir_insert(const struct lu_env *env, @@ -481,7 +519,7 @@ struct osd_object *osd_object_find(const struct lu_env *env, child = osd_obj(lo); else LU_OBJECT_DEBUG(D_ERROR, env, luch, - "%s: object can't be located "DFID"\n", + "%s: object can't be located "DFID, osd_dev(ludev)->od_svname, PFID(fid)); if (child == NULL) { @@ -492,7 +530,7 @@ struct osd_object *osd_object_find(const struct lu_env *env, } } else { LU_OBJECT_DEBUG(D_ERROR, env, luch, - "%s: lu_object does not exists "DFID"\n", + "%s: lu_object does not exists "DFID, osd_dev(ludev)->od_svname, PFID(fid)); lu_object_put(env, luch); child = ERR_PTR(-ENOENT); @@ -564,7 +602,6 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, * \param key key for index * \param rec record reference * \param th transaction handler - * \param capa capability descriptor * \param ignore_quota update should not affect quota * * \retval 0 success @@ -572,8 +609,7 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, */ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, struct lustre_capa *capa, - int ignore_quota) + struct thandle *th, int ignore_quota) { struct osd_thread_info *oti = osd_oti_get(env); struct osd_object *parent = osd_dt_obj(dt); @@ -677,8 +713,9 @@ static int osd_declare_dir_delete(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_object *obj = osd_dt_obj(dt); struct osd_thandle *oh; + uint64_t dnode; ENTRY; LASSERT(dt_object_exists(dt)); @@ -687,17 +724,20 @@ static int osd_declare_dir_delete(const struct lu_env *env, LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); - LASSERT(obj->oo_db); - LASSERT(osd_object_is_zap(obj->oo_db)); - - dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key); + if (dt_object_exists(dt)) { + LASSERT(obj->oo_db); + LASSERT(osd_object_is_zap(obj->oo_db)); + dnode = obj->oo_db->db_object; + } else { + dnode = DMU_NEW_OBJECT; + } + dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key); RETURN(0); } static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa) + const struct dt_key *key, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -707,8 +747,8 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; - LASSERT(obj->oo_db); - LASSERT(osd_object_is_zap(obj->oo_db)); + LASSERT(zap_db); + LASSERT(osd_object_is_zap(zap_db)); LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); @@ -737,12 +777,11 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, static struct dt_it *osd_dir_it_init(const struct lu_env *env, struct dt_object *dt, - __u32 unused, - struct lustre_capa *capa) + __u32 unused) { struct osd_zap_it *it; - it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa); + it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused); if (!IS_ERR(it)) it->ozi_pos = 0; @@ -1140,8 +1179,7 @@ static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst, } static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -1184,8 +1222,7 @@ static int osd_declare_index_insert(const struct lu_env *env, static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, struct lustre_capa *capa, - int ignore_quota) + struct thandle *th, int ignore_quota) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -1231,8 +1268,7 @@ static int osd_declare_index_delete(const struct lu_env *env, } static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa) + const struct dt_key *key, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -1417,8 +1453,7 @@ struct osd_metadnode_it { }; static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env, - struct dt_object *dt, __u32 attr, - struct lustre_capa *capa) + struct dt_object *dt, __u32 attr) { struct osd_device *dev = osd_dev(dt->do_lu.lo_dev); struct osd_metadnode_it *it; @@ -1480,10 +1515,8 @@ static void osd_zfs_otable_prefetch(const struct lu_env *env, if (unlikely(rc != 0)) break; - /* dmu_prefetch() was exported in 0.6.2, if you use with - * an older release, just comment it out - this is an - * optimization */ - dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0); + osd_dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, + 0, 0, 0, ZIO_PRIORITY_ASYNC_READ); it->mit_prefetched++; } @@ -1620,50 +1653,50 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat) { struct osd_object *obj = osd_dt_obj(dt); + int rc = 0; ENTRY; - LASSERT(dt_object_exists(dt)); + down_read(&obj->oo_guard); /* * XXX: implement support for fixed-size keys sorted with natural * numerical way (not using internal hash value) */ if (feat->dif_flags & DT_IND_RANGE) - RETURN(-ERANGE); + GOTO(out, rc = -ERANGE); if (unlikely(feat == &dt_otable_features)) { dt->do_index_ops = &osd_zfs_otable_ops; - RETURN(0); + GOTO(out, rc = 0); } - LASSERT(obj->oo_db != NULL); + LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL); if (likely(feat == &dt_directory_features)) { - if (osd_object_is_zap(obj->oo_db)) + if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db)) dt->do_index_ops = &osd_dir_ops; else - RETURN(-ENOTDIR); + GOTO(out, rc = -ENOTDIR); } else if (unlikely(feat == &dt_acct_features)) { LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu))); dt->do_index_ops = &osd_acct_index_ops; - } else if (osd_object_is_zap(obj->oo_db) && - dt->do_index_ops == NULL) { + } else if (dt->do_index_ops == NULL) { /* For index file, we don't support variable key & record sizes * and the key has to be unique */ if ((feat->dif_flags & ~DT_IND_UPDATE) != 0) - RETURN(-EINVAL); + GOTO(out, rc = -EINVAL); if (feat->dif_keysize_max > ZAP_MAXNAMELEN) - RETURN(-E2BIG); + GOTO(out, rc = -E2BIG); if (feat->dif_keysize_max != feat->dif_keysize_min) - RETURN(-EINVAL); + GOTO(out, rc = -EINVAL); /* As for the record size, it should be a multiple of 8 bytes * and smaller than the maximum value length supported by ZAP. */ if (feat->dif_recsize_max > ZAP_MAXVALUELEN) - RETURN(-E2BIG); + GOTO(out, rc = -E2BIG); if (feat->dif_recsize_max != feat->dif_recsize_min) - RETURN(-EINVAL); + GOTO(out, rc = -EINVAL); obj->oo_keysize = feat->dif_keysize_max; obj->oo_recsize = feat->dif_recsize_max; @@ -1677,5 +1710,8 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt, dt->do_index_ops = &osd_index_ops; } - RETURN(0); +out: + up_read(&obj->oo_guard); + + RETURN(rc); }