X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Flocal_storage.c;h=7b36943bdba23d9c3491b056ed23b8b9ee944226;hb=592c739ecaece1f854712ede8d4a049c7d249c3a;hp=26d6e183500f8fa546300480c76dbcef6e5a3172;hpb=752c69506c4559cd5c97a8dcbe488b54822ca087;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/local_storage.c b/lustre/obdclass/local_storage.c index 26d6e18..7b36943 100644 --- a/lustre/obdclass/local_storage.c +++ b/lustre/obdclass/local_storage.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012 Whamcloud, Inc. + * Copyright (c) 2012, Intel Corporation. */ /* * lustre/obdclass/local_storage.c @@ -36,7 +36,7 @@ /* all initialized local storages on this node are linked on this */ static CFS_LIST_HEAD(ls_list_head); -static CFS_DEFINE_MUTEX(ls_list_mutex); +static DEFINE_MUTEX(ls_list_mutex); static int ls_object_init(const struct lu_env *env, struct lu_object *o, const struct lu_object_conf *unused) @@ -122,9 +122,9 @@ struct ls_device *ls_find_dev(struct dt_device *dev) { struct ls_device *ls; - cfs_mutex_lock(&ls_list_mutex); + mutex_lock(&ls_list_mutex); ls = __ls_find_dev(dev); - cfs_mutex_unlock(&ls_list_mutex); + mutex_unlock(&ls_list_mutex); return ls; } @@ -139,14 +139,13 @@ static struct lu_device_type ls_lu_type = { .ldt_ops = &ls_device_type_ops, }; -static struct ls_device *ls_device_get(const struct lu_env *env, - struct dt_device *dev) +struct ls_device *ls_device_get(struct dt_device *dev) { struct ls_device *ls; ENTRY; - cfs_mutex_lock(&ls_list_mutex); + mutex_lock(&ls_list_mutex); ls = __ls_find_dev(dev); if (ls) GOTO(out_ls, ls); @@ -158,7 +157,7 @@ static struct ls_device *ls_device_get(const struct lu_env *env, cfs_atomic_set(&ls->ls_refcount, 1); CFS_INIT_LIST_HEAD(&ls->ls_los_list); - cfs_mutex_init(&ls->ls_los_mutex); + mutex_init(&ls->ls_los_mutex); ls->ls_osd = dev; @@ -170,17 +169,17 @@ static struct ls_device *ls_device_get(const struct lu_env *env, /* finally add ls to the list */ cfs_list_add(&ls->ls_linkage, &ls_list_head); out_ls: - cfs_mutex_unlock(&ls_list_mutex); + mutex_unlock(&ls_list_mutex); RETURN(ls); } -static void ls_device_put(const struct lu_env *env, struct ls_device *ls) +void ls_device_put(const struct lu_env *env, struct ls_device *ls) { LASSERT(env); if (!cfs_atomic_dec_and_test(&ls->ls_refcount)) return; - cfs_mutex_lock(&ls_list_mutex); + mutex_lock(&ls_list_mutex); if (cfs_atomic_read(&ls->ls_refcount) == 0) { LASSERT(cfs_list_empty(&ls->ls_los_list)); cfs_list_del(&ls->ls_linkage); @@ -188,7 +187,7 @@ static void ls_device_put(const struct lu_env *env, struct ls_device *ls) lu_device_fini(&ls->ls_top_dev.dd_lu_dev); OBD_FREE_PTR(ls); } - cfs_mutex_unlock(&ls_list_mutex); + mutex_unlock(&ls_list_mutex); } /** @@ -207,11 +206,11 @@ int local_object_fid_generate(const struct lu_env *env, * the latest generated fid atomically with * object creation see local_object_create() */ - cfs_mutex_lock(&los->los_id_lock); + mutex_lock(&los->los_id_lock); fid->f_seq = los->los_seq; fid->f_oid = los->los_last_oid++; fid->f_ver = 0; - cfs_mutex_unlock(&los->los_id_lock); + mutex_unlock(&los->los_id_lock); return 0; } @@ -262,13 +261,6 @@ int local_object_create(const struct lu_env *env, if (rc) RETURN(rc); - lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu)); - lustre_lma_swab(&dti->dti_lma); - dti->dti_lb.lb_buf = &dti->dti_lma; - dti->dti_lb.lb_len = sizeof(dti->dti_lma); - rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th, - BYPASS_CAPA); - if (los == NULL) RETURN(rc); @@ -278,7 +270,7 @@ int local_object_create(const struct lu_env *env, /* many threads can be updated this, serialize * them here to avoid the race where one thread * takes the value first, but writes it last */ - cfs_mutex_lock(&los->los_id_lock); + mutex_lock(&los->los_id_lock); /* update local oid number on disk so that * we know the last one used after reboot */ @@ -290,7 +282,7 @@ int local_object_create(const struct lu_env *env, dti->dti_lb.lb_len = sizeof(losd); rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off, th); - cfs_mutex_unlock(&los->los_id_lock); + mutex_unlock(&los->los_id_lock); RETURN(rc); } @@ -385,13 +377,6 @@ out: if (rc) { lu_object_put_nocache(env, &dto->do_lu); dto = ERR_PTR(rc); - } else { - struct lu_fid dti_fid; - /* since local files FIDs are not in OI the directory entry - * is used to get inode number/generation, we need to do lookup - * again to cache this data after create */ - rc = dt_lookup_dir(env, parent, name, &dti_fid); - LASSERT(rc == 0); } RETURN(dto); } @@ -461,7 +446,7 @@ struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env, } else { struct ls_device *ls; - ls = ls_device_get(env, dt); + ls = ls_device_get(dt); if (IS_ERR(ls)) { dto = ERR_PTR(PTR_ERR(ls)); } else { @@ -555,7 +540,7 @@ local_index_find_or_create_with_fid(const struct lu_env *env, } else { struct ls_device *ls; - ls = ls_device_get(env, dt); + ls = ls_device_get(dt); if (IS_ERR(ls)) { dto = ERR_PTR(PTR_ERR(ls)); } else { @@ -581,7 +566,7 @@ local_index_find_or_create_with_fid(const struct lu_env *env, } EXPORT_SYMBOL(local_index_find_or_create_with_fid); -static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq) +struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq) { struct local_oid_storage *los, *ret = NULL; @@ -595,6 +580,15 @@ static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq) return ret; } +void dt_los_put(struct local_oid_storage *los) +{ + if (cfs_atomic_dec_and_test(&los->los_refcount)) + /* should never happen, only local_oid_storage_fini should + * drop refcount to zero */ + LBUG(); + return; +} + /** * Initialize local OID storage for required sequence. * That may be needed for services that uses local files and requires @@ -619,18 +613,18 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, struct dt_thread_info *dti = dt_info(env); struct ls_device *ls; struct los_ondisk losd; - struct dt_object *o; struct dt_object *root = NULL; + struct dt_object *o = NULL; struct thandle *th; int rc; ENTRY; - ls = ls_device_get(env, dev); + ls = ls_device_get(dev); if (IS_ERR(ls)) RETURN(PTR_ERR(ls)); - cfs_mutex_lock(&ls->ls_los_mutex); + mutex_lock(&ls->ls_los_mutex); *los = dt_los_find(ls, fid_seq(first_fid)); if (*los != NULL) GOTO(out, rc = 0); @@ -641,17 +635,11 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, GOTO(out, rc = -ENOMEM); cfs_atomic_set(&(*los)->los_refcount, 1); - cfs_mutex_init(&(*los)->los_id_lock); + mutex_init(&(*los)->los_id_lock); (*los)->los_dev = &ls->ls_top_dev; cfs_atomic_inc(&ls->ls_refcount); cfs_list_add(&(*los)->los_list, &ls->ls_los_list); - /* initialize data allowing to generate new fids, - * literally we need a sequence */ - o = ls_locate(env, ls, first_fid); - if (IS_ERR(o)) - GOTO(out_los, rc = PTR_ERR(o)); - rc = dt_root_get(env, dev, &dti->dti_fid); if (rc) GOTO(out_los, rc); @@ -660,11 +648,25 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, if (IS_ERR(root)) GOTO(out_los, rc = PTR_ERR(root)); - if (dt_try_as_dir(env, root) == 0) - GOTO(out_los, rc = -ENOTDIR); + snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-%Lx-lastid", + fid_seq(first_fid)); + rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid); + if (rc != 0 && rc != -ENOENT) + GOTO(out_los, rc); + + /* initialize data allowing to generate new fids, + * literally we need a sequence */ + if (rc == 0) + o = ls_locate(env, ls, &dti->dti_fid); + else + o = ls_locate(env, ls, first_fid); + if (IS_ERR(o)) + GOTO(out_los, rc = PTR_ERR(o)); dt_write_lock(env, o, 0); if (!dt_object_exists(o)) { + LASSERT(rc == -ENOENT); + th = dt_trans_create(env, dev); if (IS_ERR(th)) GOTO(out_lock, rc = PTR_ERR(th)); @@ -678,8 +680,6 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, if (rc) GOTO(out_trans, rc); - snprintf(dti->dti_buf, sizeof(dti->dti_buf), - "seq-%Lx-lastid", fid_seq(first_fid)); rc = dt_declare_insert(env, root, (const struct dt_rec *)lu_object_fid(&o->do_lu), (const struct dt_key *)dti->dti_buf, @@ -708,15 +708,6 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, GOTO(out_trans, rc); LASSERT(dt_object_exists(o)); - lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu)); - lustre_lma_swab(&dti->dti_lma); - dti->dti_lb.lb_buf = &dti->dti_lma; - dti->dti_lb.lb_len = sizeof(dti->dti_lma); - rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, - th, BYPASS_CAPA); - if (rc) - GOTO(out_trans, rc); - losd.lso_magic = cpu_to_le32(LOS_MAGIC); losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1); @@ -726,12 +717,22 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th); if (rc) GOTO(out_trans, rc); - rc = dt_insert(env, root, - (const struct dt_rec *)lu_object_fid(&o->do_lu), - (const struct dt_key *)dti->dti_buf, th, - BYPASS_CAPA, 1); - if (rc) - GOTO(out_trans, rc); +#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0) +#error "fix this before release" +#endif + /* + * there is one technical debt left in Orion: + * proper hanlding of named vs no-name objects. + * Llog objects have name always as they are placed in O/d/... + */ + if (fid_seq(lu_object_fid(&o->do_lu)) != FID_SEQ_LLOG) { + rc = dt_insert(env, root, + (const struct dt_rec *)first_fid, + (const struct dt_key *)dti->dti_buf, + th, BYPASS_CAPA, 1); + if (rc) + GOTO(out_trans, rc); + } out_trans: dt_trans_stop(env, dev, th); } else { @@ -748,12 +749,15 @@ out_trans: out_lock: dt_write_unlock(env, o); out_los: - if (root) + if (root != NULL && !IS_ERR(root)) lu_object_put_nocache(env, &root->do_lu); - if (rc) { + + if (rc != 0) { + cfs_list_del(&(*los)->los_list); + cfs_atomic_dec(&ls->ls_refcount); OBD_FREE_PTR(*los); *los = NULL; - if (o) + if (o != NULL && !IS_ERR(o)) lu_object_put_nocache(env, &o->do_lu); } else { (*los)->los_seq = fid_seq(first_fid); @@ -761,7 +765,7 @@ out_los: (*los)->los_obj = o; } out: - cfs_mutex_unlock(&ls->ls_los_mutex); + mutex_unlock(&ls->ls_los_mutex); ls_device_put(env, ls); return rc; } @@ -779,14 +783,14 @@ void local_oid_storage_fini(const struct lu_env *env, LASSERT(los->los_dev); ls = dt2ls_dev(los->los_dev); - cfs_mutex_lock(&ls->ls_los_mutex); + mutex_lock(&ls->ls_los_mutex); if (cfs_atomic_read(&los->los_refcount) == 0) { if (los->los_obj) lu_object_put_nocache(env, &los->los_obj->do_lu); cfs_list_del(&los->los_list); OBD_FREE_PTR(los); } - cfs_mutex_unlock(&ls->ls_los_mutex); + mutex_unlock(&ls->ls_los_mutex); ls_device_put(env, ls); } EXPORT_SYMBOL(local_oid_storage_fini);