X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd-zfs%2Fosd_index.c;h=51db373bb082a0c6a9035febecddfa9af9902d43;hp=2c02470359218adeae6ecba7fee63876adbab9b6;hb=76f0977b7ea5d46836cb459deb7b9ad9e781d585;hpb=aea3cfc62c101066910daf1f9f12a7848f04be13 diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index 2c02470..51db373 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -28,7 +28,7 @@ * Use is subject to license terms. */ /* - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2013, Intel Corporation. * Use is subject to license terms. */ /* @@ -41,14 +41,10 @@ * Author: Mike Pershin */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_OSD #include #include -#include #include #include #include @@ -476,25 +472,40 @@ static inline void osd_object_put(const struct lu_env *env, lu_object_put(env, &obj->oo_dt.do_lu); } -static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, - struct lu_fid *fid) +static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd, + obd_seq seq) { struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range; struct seq_server_site *ss = osd_seq_site(osd); int rc; ENTRY; - if (!fid_is_norm(fid) && !fid_is_root(fid)) - RETURN(0); + if (ss == NULL) + RETURN(1); - rc = osd_fld_lookup(env, osd, fid, range); + rc = osd_fld_lookup(env, osd, seq, range); if (rc != 0) { - CERROR("%s: Can not lookup fld for "DFID"\n", - osd_name(osd), PFID(fid)); - RETURN(rc); + CERROR("%s: Can not lookup fld for "LPX64"\n", + osd_name(osd), seq); + RETURN(0); } - RETURN(ss->ss_node_id != range->lsr_index); + RETURN(ss->ss_node_id == range->lsr_index); +} + +static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, + struct lu_fid *fid) +{ + ENTRY; + + /* FID seqs not in FLDB, must be local seq */ + if (unlikely(!fid_seq_in_fldb(fid_seq(fid)))) + RETURN(0); + + if (osd_seq_exists(env, osd, fid_seq(fid))) + RETURN(0); + + RETURN(1); } /** @@ -556,27 +567,21 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(child)) RETURN(PTR_ERR(child)); -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 91, 0) -#define OSD_ZFS_INSERT_DOTS_FOR_TESTING__ -#endif LASSERT(child->oo_db); if (name[0] == '.') { if (name[1] == 0) { /* do not store ".", instead generate it * during iteration */ -#ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING GOTO(out, rc = 0); -#endif } else if (name[1] == '.' && name[2] == 0) { /* update parent dnode in the child. * later it will be used to generate ".." */ udmu_objset_t *uos = &osd->od_objset; - rc = osd_object_sa_update(parent, SA_ZPL_PARENT(uos), - &child->oo_db->db_object, - 8, oh); -#ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING + rc = osd_object_sa_update(parent, + SA_ZPL_PARENT(uos), + &child->oo_db->db_object, + 8, oh); GOTO(out, rc); -#endif } } CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8); @@ -592,9 +597,7 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, (char *)key, 8, sizeof(oti->oti_zde) / 8, (void *)&oti->oti_zde, oh->ot_tx); -#ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING out: -#endif if (child != NULL) osd_object_put(env, child); @@ -642,10 +645,9 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); -#ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING /* - * in Orion . and .. were stored in the directory (not generated up on - * request as now. we preserve them for backward compatibility + * In Orion . and .. were stored in the directory (not generated upon + * request as now). we preserve them for backward compatibility */ if (name[0] == '.') { if (name[1] == 0) { @@ -654,7 +656,6 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, RETURN(0); } } -#endif /* Remove key from the ZAP */ rc = -zap_remove(osd->od_objset.os, zap_db->db_object, @@ -841,7 +842,7 @@ static struct dt_key *osd_dir_it_key(const struct lu_env *env, strcpy(it->ozi_name, za->za_name); -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 91, 0) +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0) if (za->za_name[0] == '.') { if (za->za_name[1] == 0 || (za->za_name[1] == '.' && za->za_name[2] == 0)) { @@ -874,7 +875,7 @@ static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di) if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0) rc = strlen(za->za_name); -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 99, 0) +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0) if (rc == 0 && za->za_name[0] == '.') { if (za->za_name[1] == 0 || (za->za_name[1] == '.' && za->za_name[2] == 0)) { @@ -932,11 +933,11 @@ static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di, LASSERT(lde); - lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc)); - - if ((rc = -zap_cursor_retrieve(it->ozi_zc, za))) + rc = -zap_cursor_retrieve(it->ozi_zc, za); + if (unlikely(rc != 0)) GOTO(out, rc); + lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc)); namelen = strlen(za->za_name); if (namelen > NAME_MAX) GOTO(out, rc = -EOVERFLOW); @@ -1041,20 +1042,44 @@ static struct dt_index_operations osd_dir_ops = { /* * Primitives for index files using binary keys. - * XXX: only 64-bit keys are supported for now. */ +/* key integer_size is 8 */ +static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst, + const struct dt_key *src) +{ + int size; + + LASSERT(dst); + LASSERT(src); + + /* align keysize to 64bit */ + size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64); + size *= sizeof(__u64); + + LASSERT(size <= MAXNAMELEN); + + if (unlikely(size > o->oo_keysize)) + memset(dst + o->oo_keysize, 0, size - o->oo_keysize); + memcpy(dst, (const char *)src, o->oo_keysize); + + return (size/sizeof(__u64)); +} + static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_rec *rec, const struct dt_key *key, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); + __u64 *k = osd_oti_get(env)->oti_key64; int rc; ENTRY; + rc = osd_prepare_key_uint64(obj, k, key); + rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object, - (const __u64 *)key, 1, 8, obj->oo_recsize, + k, rc, obj->oo_recusize, obj->oo_recsize, (void *)rec); RETURN(rc == 0 ? 1 : rc); } @@ -1092,6 +1117,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; + __u64 *k = osd_oti_get(env)->oti_key64; int rc; ENTRY; @@ -1102,9 +1128,11 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, oh = container_of0(th, struct osd_thandle, ot_super); + rc = osd_prepare_key_uint64(obj, k, key); + /* Insert (key,oid) into ZAP */ rc = -zap_add_uint64(osd->od_objset.os, obj->oo_db->db_object, - (const __u64 *)key, 1, 8, obj->oo_recsize, + k, rc, obj->oo_recusize, obj->oo_recsize, (void *)rec, oh->ot_tx); RETURN(rc); } @@ -1136,6 +1164,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; + __u64 *k = osd_oti_get(env)->oti_key64; int rc; ENTRY; @@ -1143,9 +1172,11 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); + rc = osd_prepare_key_uint64(obj, k, key); + /* Remove binary key from the ZAP */ rc = -zap_remove_uint64(osd->od_objset.os, obj->oo_db->db_object, - (const __u64 *)key, 1, oh->ot_tx); + k, rc, oh->ot_tx); RETURN(rc); } @@ -1160,8 +1191,11 @@ static int osd_index_it_get(const struct lu_env *env, struct dt_it *di, LASSERT(it); LASSERT(it->ozi_zc); - /* XXX: API is broken at the moment */ - LASSERT(*((const __u64 *)key) == 0); + /* + * XXX: we need a binary version of zap_cursor_move_to_key() + * to implement this API */ + if (*((const __u64 *)key) != 0) + CERROR("NOT IMPLEMETED YET (move to %Lx)\n", *((__u64 *)key)); zap_cursor_fini(it->ozi_zc); memset(it->ozi_zc, 0, sizeof(*it->ozi_zc)); @@ -1199,6 +1233,7 @@ static struct dt_key *osd_index_it_key(const struct lu_env *env, const struct dt_it *di) { struct osd_zap_it *it = (struct osd_zap_it *)di; + struct osd_object *obj = it->ozi_obj; zap_attribute_t *za = &osd_oti_get(env)->oti_za; int rc = 0; ENTRY; @@ -1209,7 +1244,7 @@ static struct dt_key *osd_index_it_key(const struct lu_env *env, RETURN(ERR_PTR(rc)); /* the binary key is stored in the name */ - it->ozi_key = *((__u64 *)za->za_name); + memcpy(&it->ozi_key, za->za_name, obj->oo_keysize); RETURN((struct dt_key *)&it->ozi_key); } @@ -1217,8 +1252,9 @@ static struct dt_key *osd_index_it_key(const struct lu_env *env, static int osd_index_it_key_size(const struct lu_env *env, const struct dt_it *di) { - /* we only support 64-bit binary keys for the time being */ - RETURN(sizeof(__u64)); + struct osd_zap_it *it = (struct osd_zap_it *)di; + struct osd_object *obj = it->ozi_obj; + RETURN(obj->oo_keysize); } static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di, @@ -1228,6 +1264,7 @@ static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di, struct osd_zap_it *it = (struct osd_zap_it *)di; struct osd_object *obj = it->ozi_obj; struct osd_device *osd = osd_obj2dev(obj); + __u64 *k = osd_oti_get(env)->oti_key64; int rc; ENTRY; @@ -1236,9 +1273,11 @@ static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di, if (rc) RETURN(rc); + rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name); + rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object, - (const __u64 *)za->za_name, 1, 8, - obj->oo_recsize, (void *)rec); + k, rc, obj->oo_recusize, obj->oo_recsize, + (void *)rec); RETURN(rc); } @@ -1334,10 +1373,9 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt, if ((feat->dif_flags & ~DT_IND_UPDATE) != 0) RETURN(-EINVAL); - /* Although the zap_*_uint64() primitives support large keys, we - * limit ourselves to 64-bit keys for now */ - if (feat->dif_keysize_max != sizeof(__u64) || - feat->dif_keysize_min != sizeof(__u64)) + if (feat->dif_keysize_max > ZAP_MAXNAMELEN) + RETURN(-E2BIG); + if (feat->dif_keysize_max != feat->dif_keysize_min) RETURN(-EINVAL); /* As for the record size, it should be a multiple of 8 bytes @@ -1345,11 +1383,18 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt, */ if (feat->dif_recsize_max > ZAP_MAXVALUELEN) RETURN(-E2BIG); - if (feat->dif_recsize_max != feat->dif_recsize_min || - (feat->dif_recsize_max & (sizeof(__u64) - 1))) + if (feat->dif_recsize_max != feat->dif_recsize_min) RETURN(-EINVAL); - obj->oo_recsize = feat->dif_recsize_max / sizeof(__u64); + obj->oo_keysize = feat->dif_keysize_max; + obj->oo_recsize = feat->dif_recsize_max; + obj->oo_recusize = 1; + + /* ZFS prefers to work with array of 64bits */ + if ((obj->oo_recsize & 7) == 0) { + obj->oo_recsize >>= 3; + obj->oo_recusize = 8; + } dt->do_index_ops = &osd_index_ops; }