X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosd-zfs%2Fosd_index.c;h=9780e43f649d8c45e401c645e9212a145781d77c;hb=d690560bcef69cfe91747ffb8a7efb77c04800e6;hp=210a00e99cae8f50b004913528454b9dc8de0bb8;hpb=c63d9ee84ef7150f277b955cb0f9dca2f9d876fe;p=fs%2Flustre-release.git diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index 210a00e..9780e43 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -26,10 +26,8 @@ /* * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. - */ -/* - * Copyright (c) 2012, 2013, Intel Corporation. - * Use is subject to license terms. + * + * Copyright (c) 2012, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -140,7 +138,8 @@ static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc, uint64_t dirhash) { struct osd_device *d = osd_obj2dev(o); - zap_cursor_init_serialized(zc, d->od_os, o->oo_db->db_object, dirhash); + osd_zap_cursor_init_serialized(zc, d->od_os, + o->oo_db->db_object, dirhash); } static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o, @@ -152,8 +151,7 @@ static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o, static struct dt_it *osd_index_it_init(const struct lu_env *env, struct dt_object *dt, - __u32 unused, - struct lustre_capa *capa) + __u32 unused) { struct osd_thread_info *info = osd_oti_get(env); struct osd_zap_it *it; @@ -162,34 +160,25 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env, int rc; ENTRY; - /* XXX: check capa ? */ + if (obj->oo_destroyed) + RETURN(ERR_PTR(-ENOENT)); LASSERT(lu_object_exists(lo)); LASSERT(obj->oo_db); LASSERT(osd_object_is_zap(obj->oo_db)); LASSERT(info); - if (info->oti_it_inline) { - OBD_ALLOC_PTR(it); - if (it == NULL) - RETURN(ERR_PTR(-ENOMEM)); - } else { - it = &info->oti_it_zap; - info->oti_it_inline = 1; - } + OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS); + if (it == NULL) + RETURN(ERR_PTR(-ENOMEM)); rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0); if (rc != 0) { - if (it != &info->oti_it_zap) - OBD_FREE_PTR(it); - else - info->oti_it_inline = 0; - + OBD_SLAB_FREE_PTR(it, osd_zapit_cachep); RETURN(ERR_PTR(rc)); } it->ozi_obj = obj; - it->ozi_capa = capa; it->ozi_reset = 1; lu_object_get(lo); @@ -198,7 +187,6 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env, static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di) { - struct osd_thread_info *info = osd_oti_get(env); struct osd_zap_it *it = (struct osd_zap_it *)di; struct osd_object *obj; ENTRY; @@ -210,10 +198,7 @@ static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di) osd_zap_cursor_fini(it->ozi_zc); lu_object_put(env, &obj->oo_dt.do_lu); - if (it != &info->oti_it_zap) - OBD_FREE_PTR(it); - else - info->oti_it_inline = 0; + OBD_SLAB_FREE_PTR(it, osd_zapit_cachep); EXIT; } @@ -342,10 +327,9 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o, buf.lb_buf = osd_oti_get(env)->oti_buf; buf.lb_len = sizeof(osd_oti_get(env)->oti_buf); - rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA); + rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK); if (rc == -ERANGE) { - rc = osd_xattr_get(env, o, &LU_BUF_NULL, - XATTR_NAME_LINK, BYPASS_CAPA); + rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK); if (rc < 0) RETURN(rc); LASSERT(rc > 0); @@ -353,7 +337,7 @@ static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o, if (buf.lb_buf == NULL) RETURN(-ENOMEM); buf.lb_len = rc; - rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA); + rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK); } if (rc < 0) GOTO(out, rc); @@ -401,8 +385,7 @@ out: } static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct osd_thread_info *oti = osd_oti_get(env); struct osd_object *obj = osd_dt_obj(dt); @@ -440,16 +423,20 @@ static int osd_declare_dir_insert(const struct lu_env *env, { struct osd_object *obj = osd_dt_obj(dt); struct osd_thandle *oh; + uint64_t object; ENTRY; LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); - LASSERT(obj->oo_db); - LASSERT(osd_object_is_zap(obj->oo_db)); + /* This is for inserting dot/dotdot for new created dir. */ + if (obj->oo_db == NULL) + object = DMU_NEW_OBJECT; + else + object = obj->oo_db->db_object; - dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object); - dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key); + dmu_tx_hold_bonus(oh->ot_tx, object); + dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key); RETURN(0); } @@ -523,7 +510,7 @@ static inline void osd_object_put(const struct lu_env *env, } static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd, - obd_seq seq) + u64 seq) { struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range; struct seq_server_site *ss = osd_seq_site(osd); @@ -535,8 +522,9 @@ static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd, rc = osd_fld_lookup(env, osd, seq, range); if (rc != 0) { - CERROR("%s: Can not lookup fld for "LPX64"\n", - osd_name(osd), seq); + if (rc != -ENOENT) + CERROR("%s: Can not lookup fld for "LPX64"\n", + osd_name(osd), seq); RETURN(0); } @@ -573,7 +561,6 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, * \param key key for index * \param rec record reference * \param th transaction handler - * \param capa capability descriptor * \param ignore_quota update should not affect quota * * \retval 0 success @@ -581,8 +568,7 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, */ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, struct lustre_capa *capa, - int ignore_quota) + struct thandle *th, int ignore_quota) { struct osd_thread_info *oti = osd_oti_get(env); struct osd_object *parent = osd_dt_obj(dt); @@ -633,12 +619,25 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, * during iteration */ GOTO(out, rc = 0); } else if (name[1] == '.' && name[2] == 0) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) { + struct lu_fid tfid = *fid; + + osd_object_put(env, child); + tfid.f_oid--; + child = osd_object_find(env, dt, &tfid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + + LASSERT(child->oo_db); + } + /* update parent dnode in the child. * later it will be used to generate ".." */ rc = osd_object_sa_update(parent, SA_ZPL_PARENT(osd), &child->oo_db->db_object, 8, oh); + GOTO(out, rc); } } @@ -654,6 +653,12 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, rc = -zap_add(osd->od_os, parent->oo_db->db_object, (char *)key, 8, sizeof(oti->oti_zde) / 8, (void *)&oti->oti_zde, oh->ot_tx); + if (unlikely(rc == -EEXIST && + name[0] == '.' && name[1] == '.' && name[2] == 0)) + /* Update (key,oid) in ZAP */ + rc = -zap_update(osd->od_os, parent->oo_db->db_object, + (char *)key, 8, sizeof(oti->oti_zde) / 8, + (void *)&oti->oti_zde, oh->ot_tx); out: if (child != NULL) @@ -667,8 +672,9 @@ static int osd_declare_dir_delete(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_object *obj = osd_dt_obj(dt); struct osd_thandle *oh; + uint64_t dnode; ENTRY; LASSERT(dt_object_exists(dt)); @@ -677,17 +683,20 @@ static int osd_declare_dir_delete(const struct lu_env *env, LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); - LASSERT(obj->oo_db); - LASSERT(osd_object_is_zap(obj->oo_db)); - - dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key); + if (dt_object_exists(dt)) { + LASSERT(obj->oo_db); + LASSERT(osd_object_is_zap(obj->oo_db)); + dnode = obj->oo_db->db_object; + } else { + dnode = DMU_NEW_OBJECT; + } + dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key); RETURN(0); } static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa) + const struct dt_key *key, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -697,8 +706,8 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; - LASSERT(obj->oo_db); - LASSERT(osd_object_is_zap(obj->oo_db)); + LASSERT(zap_db); + LASSERT(osd_object_is_zap(zap_db)); LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); @@ -727,12 +736,11 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, static struct dt_it *osd_dir_it_init(const struct lu_env *env, struct dt_object *dt, - __u32 unused, - struct lustre_capa *capa) + __u32 unused) { struct osd_zap_it *it; - it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa); + it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused); if (!IS_ERR(it)) it->ozi_pos = 0; @@ -842,6 +850,8 @@ static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di) zap_attribute_t *za = &osd_oti_get(env)->oti_za; int rc; + ENTRY; + /* temp. storage should be enough for any key supported by ZFS */ CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name)); @@ -854,9 +864,10 @@ static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di) it->ozi_pos++; if (it->ozi_pos <=2) RETURN(0); - } - zap_cursor_advance(it->ozi_zc); + } else { + zap_cursor_advance(it->ozi_zc); + } /* * According to current API we need to return error if its last entry. @@ -946,14 +957,11 @@ static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di, osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR)); lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr)); rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid); - /* - * early Orion code was not setting LinkEA, so it's possible - * some setups still have objects with no LinkEA set. - * but at that time .. was a real record in the directory - * so we should try to lookup .. in ZAP - */ - if (rc != -ENOENT) - GOTO(out, rc); + + /* ENOENT happens at the root of filesystem so ignore it */ + if (rc == -ENOENT) + rc = 0; + GOTO(out, rc); } LASSERT(lde); @@ -1082,7 +1090,7 @@ static int osd_dir_it_load(const struct lu_env *env, RETURN(rc); } -static struct dt_index_operations osd_dir_ops = { +struct dt_index_operations osd_dir_ops = { .dio_lookup = osd_dir_lookup, .dio_declare_insert = osd_declare_dir_insert, .dio_insert = osd_dir_insert, @@ -1130,8 +1138,7 @@ static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst, } static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -1174,8 +1181,7 @@ static int osd_declare_index_insert(const struct lu_env *env, static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, struct lustre_capa *capa, - int ignore_quota) + struct thandle *th, int ignore_quota) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -1221,8 +1227,7 @@ static int osd_declare_index_delete(const struct lu_env *env, } static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa) + const struct dt_key *key, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -1407,8 +1412,7 @@ struct osd_metadnode_it { }; static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env, - struct dt_object *dt, __u32 attr, - struct lustre_capa *capa) + struct dt_object *dt, __u32 attr) { struct osd_device *dev = osd_dev(dt->do_lu.lo_dev); struct osd_metadnode_it *it; @@ -1612,8 +1616,6 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt, struct osd_object *obj = osd_dt_obj(dt); ENTRY; - LASSERT(dt_object_exists(dt)); - /* * XXX: implement support for fixed-size keys sorted with natural * numerical way (not using internal hash value) @@ -1626,17 +1628,16 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt, RETURN(0); } - LASSERT(obj->oo_db != NULL); + LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL); if (likely(feat == &dt_directory_features)) { - if (osd_object_is_zap(obj->oo_db)) + if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db)) dt->do_index_ops = &osd_dir_ops; else RETURN(-ENOTDIR); } else if (unlikely(feat == &dt_acct_features)) { LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu))); dt->do_index_ops = &osd_acct_index_ops; - } else if (osd_object_is_zap(obj->oo_db) && - dt->do_index_ops == NULL) { + } else if (dt->do_index_ops == NULL) { /* For index file, we don't support variable key & record sizes * and the key has to be unique */ if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)