From f6d6a552398eb1e65857d9bf1afaaf98c8dc1a79 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Thu, 6 Sep 2012 08:48:16 +0400 Subject: [PATCH] LU-1301 lu: local objects library set of functions working on top of OSD API to create/access local objects by name. the library maintains own top device to be able to work in multi-service environment (mds + mgs). Signed-off-by: Alex Zhuravlev Change-Id: I26cc47b866bb0925be4f4419ac663a1d42520e02 Reviewed-on: http://review.whamcloud.com/3665 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Jinshan Xiong --- lustre/include/dt_object.h | 93 ++++- lustre/include/lu_object.h | 1 + lustre/include/lustre/lustre_idl.h | 4 + lustre/include/lustre_disk.h | 10 + lustre/obdclass/Makefile.in | 3 +- lustre/obdclass/dt_object.c | 65 ++-- lustre/obdclass/local_storage.c | 685 +++++++++++++++++++++++++++++++++++++ lustre/obdclass/local_storage.h | 75 ++++ lustre/obdclass/lu_object.c | 12 + lustre/osd-ldiskfs/osd_handler.c | 12 +- 10 files changed, 919 insertions(+), 41 deletions(-) create mode 100644 lustre/obdclass/local_storage.c create mode 100644 lustre/obdclass/local_storage.h diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 5455194..6cdd013 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -674,6 +674,24 @@ struct dt_object { const struct dt_index_operations *do_index_ops; }; +/* + * In-core representation of per-device local object OID storage + */ +struct local_oid_storage { + /* all initialized llog systems on this node linked by this */ + cfs_list_t los_list; + + /* how many handle's reference this los has */ + cfs_atomic_t los_refcount; + struct dt_device *los_dev; + struct dt_object *los_obj; + + /* data used to generate new fids */ + cfs_mutex_t los_id_lock; + __u64 los_seq; + __u32 los_last_oid; +}; + static inline struct dt_object *lu2dt(struct lu_object *l) { LASSERT(l == NULL || IS_ERR(l) || lu_device_is_dt(l->lo_dev)); @@ -783,9 +801,50 @@ struct dt_object *dt_find_or_create(const struct lu_env *env, struct dt_object_format *dof, struct lu_attr *attr); -struct dt_object *dt_locate(const struct lu_env *env, - struct dt_device *dev, - const struct lu_fid *fid); +struct dt_object *dt_locate_at(const struct lu_env *env, + struct dt_device *dev, + const struct lu_fid *fid, + struct lu_device *top_dev); +static inline struct dt_object * +dt_locate(const struct lu_env *env, struct dt_device *dev, + const struct lu_fid *fid) +{ + return dt_locate_at(env, dev, fid, dev->dd_lu_dev.ld_site->ls_top_dev); +} + + +int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, + const struct lu_fid *first_fid, + struct local_oid_storage **los); +void local_oid_storage_fini(const struct lu_env *env, + struct local_oid_storage *los); +int local_object_fid_generate(const struct lu_env *env, + struct local_oid_storage *los, + struct lu_fid *fid); +int local_object_declare_create(const struct lu_env *env, + struct local_oid_storage *los, + struct dt_object *o, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th); +int local_object_create(const struct lu_env *env, + struct local_oid_storage *los, + struct dt_object *o, + struct lu_attr *attr, struct dt_object_format *dof, + struct thandle *th); +struct dt_object *local_file_find_or_create(const struct lu_env *env, + struct local_oid_storage *los, + struct dt_object *parent, + const char *name, __u32 mode); +struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env, + struct dt_device *dt, + const struct lu_fid *fid, + struct dt_object *parent, + const char *name, + __u32 mode); + +int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir, + const char *name, struct lu_fid *fid); static inline int dt_object_sync(const struct lu_env *env, struct dt_object *o) @@ -1315,4 +1374,32 @@ static inline int dt_lookup(const struct lu_env *env, #define LU221_BAD_TIME (0x80000000U + 24 * 3600) +struct dt_find_hint { + struct lu_fid *dfh_fid; + struct dt_device *dfh_dt; + struct dt_object *dfh_o; +}; + +struct dt_thread_info { + char dti_buf[DT_MAX_PATH]; + struct dt_find_hint dti_dfh; + struct lu_attr dti_attr; + struct lu_fid dti_fid; + struct dt_object_format dti_dof; + struct lustre_mdt_attrs dti_lma; + struct lu_buf dti_lb; + loff_t dti_off; +}; + +extern struct lu_context_key dt_key; + +static inline struct dt_thread_info *dt_info(const struct lu_env *env) +{ + struct dt_thread_info *dti; + + dti = lu_context_key_get(&env->le_ctx, &dt_key); + LASSERT(dti); + return dti; +} + #endif /* __LUSTRE_DT_OBJECT_H */ diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index fb7d0b3..a2d31ae 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -719,6 +719,7 @@ static inline int lu_object_is_dying(const struct lu_object_header *h) } void lu_object_put(const struct lu_env *env, struct lu_object *o); +void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o); int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr); diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index d051631..0622b44 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -418,12 +418,16 @@ enum fid_seq { FID_SEQ_IDIF_MAX = 0x1ffffffffULL, /* Normal FID sequence starts from this value, i.e. 1<<33 */ FID_SEQ_START = 0x200000000ULL, + /* sequence for local pre-defined FIDs listed in local_oid */ FID_SEQ_LOCAL_FILE = 0x200000001ULL, FID_SEQ_DOT_LUSTRE = 0x200000002ULL, /* XXX 0x200000003ULL is reserved for FID_SEQ_LLOG_OBJ */ + /* sequence is used for local named objects FIDs generated + * by local_object_storage library */ FID_SEQ_SPECIAL = 0x200000004ULL, FID_SEQ_QUOTA = 0x200000005ULL, FID_SEQ_QUOTA_GLB = 0x200000006ULL, + FID_SEQ_LOCAL_NAME = 0x200000007ULL, FID_SEQ_NORMAL = 0x200000400ULL, FID_SEQ_LOV_DEFAULT= 0xffffffffffffffffULL }; diff --git a/lustre/include/lustre_disk.h b/lustre/include/lustre_disk.h index f91b21c..487fdaf 100644 --- a/lustre/include/lustre_disk.h +++ b/lustre/include/lustre_disk.h @@ -523,6 +523,16 @@ struct lustre_mount_info { cfs_list_t lmi_list_chain; }; +/* on-disk structure describing local object OIDs storage + * the structure to be used with any sequence managed by + * local object library */ +struct los_ondisk { + __u32 lso_magic; + __u32 lso_next_oid; +}; + +#define LOS_MAGIC 0xdecafbee + /****************** prototypes *********************/ #ifdef __KERNEL__ diff --git a/lustre/obdclass/Makefile.in b/lustre/obdclass/Makefile.in index 3a6944c..f303eae 100644 --- a/lustre/obdclass/Makefile.in +++ b/lustre/obdclass/Makefile.in @@ -10,6 +10,7 @@ sources: obdclass-all-objs := llog.o llog_cat.o llog_lvfs.o llog_obd.o llog_swab.o obdclass-all-objs += class_obd.o debug.o genops.o uuid.o llog_ioctl.o obdclass-all-objs += lprocfs_status.o lprocfs_jobstats.o lustre_handles.o lustre_peer.o +obdclass-all-objs += local_storage.o obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o mea.o obdclass-all-objs += lu_object.o dt_object.o capa.o lu_time.o obdclass-all-objs += cl_object.o cl_page.o cl_lock.o cl_io.o lu_ref.o @@ -26,6 +27,6 @@ $(obj)/llog-test.c: $(obj)/llog_test.c ln -sf $< $@ EXTRA_DIST = $(filter-out llog-test.c,$(obdclass-all-objs:.o=.c)) $(llog-test-objs:.o=.c) llog_test.c llog_internal.h -EXTRA_DIST += cl_internal.h +EXTRA_DIST += cl_internal.h local_storage.h @INCLUDE_RULES@ diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index c231814..11a8984 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -51,26 +51,16 @@ #include -struct dt_find_hint { - struct lu_fid *dfh_fid; - struct dt_device *dfh_dt; - struct dt_object *dfh_o; -}; - -struct dt_thread_info { - char dti_buf[DT_MAX_PATH]; - struct dt_find_hint dti_dfh; -}; - /* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */ LU_KEY_INIT(dt_global, struct dt_thread_info); LU_KEY_FINI(dt_global, struct dt_thread_info); -static struct lu_context_key dt_key = { +struct lu_context_key dt_key = { .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL, .lct_init = dt_global_key_init, .lct_fini = dt_global_key_fini }; +EXPORT_SYMBOL(dt_key); /* no lock is necessary to protect the list, because call-backs * are added during system startup. Please refer to "struct dt_device". @@ -221,26 +211,29 @@ int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir, return -ENOTDIR; } EXPORT_SYMBOL(dt_lookup_dir); -/** - * get object for given \a fid. - */ -struct dt_object *dt_locate(const struct lu_env *env, - struct dt_device *dev, - const struct lu_fid *fid) + +/* this differs from dt_locate by top_dev as parameter + * but not one from lu_site */ +struct dt_object *dt_locate_at(const struct lu_env *env, + struct dt_device *dev, const struct lu_fid *fid, + struct lu_device *top_dev) { - struct lu_object *obj; - struct dt_object *dt; - - obj = lu_object_find(env, &dev->dd_lu_dev, fid, NULL); - if (!IS_ERR(obj)) { - obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type); - LASSERT(obj != NULL); - dt = container_of(obj, struct dt_object, do_lu); - } else - dt = (struct dt_object *)obj; - return dt; + struct lu_object *lo, *n; + ENTRY; + + lo = lu_object_find_at(env, top_dev, fid, NULL); + if (IS_ERR(lo)) + return (void *)lo; + + LASSERT(lo != NULL); + + cfs_list_for_each_entry(n, &lo->lo_header->loh_layers, lo_linkage) { + if (n->lo_dev == &dev->dd_lu_dev) + return container_of0(n, struct dt_object, do_lu); + } + return ERR_PTR(-ENOENT); } -EXPORT_SYMBOL(dt_locate); +EXPORT_SYMBOL(dt_locate_at); /** * find a object named \a entry in given \a dfh->dfh_o directory. @@ -298,12 +291,12 @@ static struct dt_object *dt_store_resolve(const struct lu_env *env, const char *path, struct lu_fid *fid) { - struct dt_thread_info *info = lu_context_key_get(&env->le_ctx, - &dt_key); - struct dt_find_hint *dfh = &info->dti_dfh; - struct dt_object *obj; - char *local = info->dti_buf; - int result; + struct dt_thread_info *info = dt_info(env); + struct dt_find_hint *dfh = &info->dti_dfh; + struct dt_object *obj; + char *local = info->dti_buf; + int result; + dfh->dfh_dt = dt; dfh->dfh_fid = fid; diff --git a/lustre/obdclass/local_storage.c b/lustre/obdclass/local_storage.c new file mode 100644 index 0000000..ca47645 --- /dev/null +++ b/lustre/obdclass/local_storage.c @@ -0,0 +1,685 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2012 Whamcloud, Inc. + */ +/* + * lustre/obdclass/local_storage.c + * + * Local storage for file/objects with fid generation. Works on top of OSD. + * + * Author: Mikhail Pershin + */ + +#define DEBUG_SUBSYSTEM S_CLASS + +#include "local_storage.h" + +/* all initialized local storages on this node are linked on this */ +static CFS_LIST_HEAD(ls_list_head); +static CFS_DEFINE_MUTEX(ls_list_mutex); + +static int ls_object_init(const struct lu_env *env, struct lu_object *o, + const struct lu_object_conf *unused) +{ + struct ls_device *ls; + struct lu_object *below; + struct lu_device *under; + + ENTRY; + + ls = container_of0(o->lo_dev, struct ls_device, ls_top_dev.dd_lu_dev); + under = &ls->ls_osd->dd_lu_dev; + below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); + if (below == NULL) + RETURN(-ENOMEM); + + lu_object_add(o, below); + + RETURN(0); +} + +static void ls_object_free(const struct lu_env *env, struct lu_object *o) +{ + struct ls_object *obj = lu2ls_obj(o); + struct lu_object_header *h = o->lo_header; + + dt_object_fini(&obj->ls_obj); + lu_object_header_fini(h); + OBD_FREE_PTR(obj); +} + +struct lu_object_operations ls_lu_obj_ops = { + .loo_object_init = ls_object_init, + .loo_object_free = ls_object_free, +}; + +struct lu_object *ls_object_alloc(const struct lu_env *env, + const struct lu_object_header *_h, + struct lu_device *d) +{ + struct lu_object_header *h; + struct ls_object *o; + struct lu_object *l; + + LASSERT(_h == NULL); + + OBD_ALLOC_PTR(o); + if (o != NULL) { + l = &o->ls_obj.do_lu; + h = &o->ls_header; + + lu_object_header_init(h); + dt_object_init(&o->ls_obj, h, d); + lu_object_add_top(h, l); + + l->lo_ops = &ls_lu_obj_ops; + + return l; + } else { + return NULL; + } +} + +static struct lu_device_operations ls_lu_dev_ops = { + .ldo_object_alloc = ls_object_alloc +}; + +static struct ls_device *__ls_find_dev(struct dt_device *dev) +{ + struct ls_device *ls, *ret = NULL; + + cfs_list_for_each_entry(ls, &ls_list_head, ls_linkage) { + if (ls->ls_osd == dev) { + cfs_atomic_inc(&ls->ls_refcount); + ret = ls; + break; + } + } + return ret; +} + +struct ls_device *ls_find_dev(struct dt_device *dev) +{ + struct ls_device *ls; + + cfs_mutex_lock(&ls_list_mutex); + ls = __ls_find_dev(dev); + cfs_mutex_unlock(&ls_list_mutex); + + return ls; +} + +static struct lu_device_type_operations ls_device_type_ops = { + .ldto_start = NULL, + .ldto_stop = NULL, +}; + +static struct lu_device_type ls_lu_type = { + .ldt_name = "local_storage", + .ldt_ops = &ls_device_type_ops, +}; + +static struct ls_device *ls_device_get(const struct lu_env *env, + struct dt_device *dev) +{ + struct ls_device *ls; + + ENTRY; + + cfs_mutex_lock(&ls_list_mutex); + ls = __ls_find_dev(dev); + if (ls) + GOTO(out_ls, ls); + + /* not found, then create */ + OBD_ALLOC_PTR(ls); + if (ls == NULL) + GOTO(out_ls, ls = ERR_PTR(-ENOMEM)); + + cfs_atomic_set(&ls->ls_refcount, 1); + CFS_INIT_LIST_HEAD(&ls->ls_los_list); + cfs_mutex_init(&ls->ls_los_mutex); + + ls->ls_osd = dev; + + LASSERT(dev->dd_lu_dev.ld_site); + lu_device_init(&ls->ls_top_dev.dd_lu_dev, &ls_lu_type); + ls->ls_top_dev.dd_lu_dev.ld_ops = &ls_lu_dev_ops; + ls->ls_top_dev.dd_lu_dev.ld_site = dev->dd_lu_dev.ld_site; + + /* finally add ls to the list */ + cfs_list_add(&ls->ls_linkage, &ls_list_head); +out_ls: + cfs_mutex_unlock(&ls_list_mutex); + RETURN(ls); +} + +static void ls_device_put(const struct lu_env *env, struct ls_device *ls) +{ + LASSERT(env); + if (!cfs_atomic_dec_and_test(&ls->ls_refcount)) + return; + + cfs_mutex_lock(&ls_list_mutex); + if (cfs_atomic_read(&ls->ls_refcount) == 0) { + LASSERT(cfs_list_empty(&ls->ls_los_list)); + cfs_list_del(&ls->ls_linkage); + lu_site_purge(env, ls->ls_top_dev.dd_lu_dev.ld_site, ~0); + lu_device_fini(&ls->ls_top_dev.dd_lu_dev); + OBD_FREE_PTR(ls); + } + cfs_mutex_unlock(&ls_list_mutex); +} + +/** + * local file fid generation + */ +int local_object_fid_generate(const struct lu_env *env, + struct local_oid_storage *los, + struct lu_fid *fid) +{ + LASSERT(los->los_dev); + LASSERT(los->los_obj); + + /* take next OID */ + + /* to make it unique after reboot we store + * the latest generated fid atomically with + * object creation see local_object_create() */ + + cfs_mutex_lock(&los->los_id_lock); + fid->f_seq = los->los_seq; + fid->f_oid = los->los_last_oid++; + fid->f_ver = 0; + cfs_mutex_unlock(&los->los_id_lock); + + return 0; +} + +int local_object_declare_create(const struct lu_env *env, + struct local_oid_storage *los, + struct dt_object *o, struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + struct dt_thread_info *dti = dt_info(env); + int rc; + + ENTRY; + + /* update fid generation file */ + if (los != NULL) { + LASSERT(dt_object_exists(los->los_obj)); + rc = dt_declare_record_write(env, los->los_obj, + sizeof(struct los_ondisk), 0, th); + if (rc) + RETURN(rc); + } + + rc = dt_declare_create(env, o, attr, NULL, dof, th); + if (rc) + RETURN(rc); + + dti->dti_lb.lb_buf = NULL; + dti->dti_lb.lb_len = sizeof(dti->dti_lma); + rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th); + + RETURN(rc); +} + +int local_object_create(const struct lu_env *env, + struct local_oid_storage *los, + struct dt_object *o, struct lu_attr *attr, + struct dt_object_format *dof, struct thandle *th) +{ + struct dt_thread_info *dti = dt_info(env); + struct los_ondisk losd; + int rc; + + ENTRY; + + rc = dt_create(env, o, attr, NULL, dof, th); + if (rc) + RETURN(rc); + + lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu)); + lustre_lma_swab(&dti->dti_lma); + dti->dti_lb.lb_buf = &dti->dti_lma; + dti->dti_lb.lb_len = sizeof(dti->dti_lma); + rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th, + BYPASS_CAPA); + + if (los == NULL) + RETURN(rc); + + LASSERT(los->los_obj); + LASSERT(dt_object_exists(los->los_obj)); + + /* many threads can be updated this, serialize + * them here to avoid the race where one thread + * takes the value first, but writes it last */ + cfs_mutex_lock(&los->los_id_lock); + + /* update local oid number on disk so that + * we know the last one used after reboot */ + losd.lso_magic = cpu_to_le32(LOS_MAGIC); + losd.lso_next_oid = cpu_to_le32(los->los_last_oid); + + dti->dti_off = 0; + dti->dti_lb.lb_buf = &losd; + dti->dti_lb.lb_len = sizeof(losd); + rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off, + th); + cfs_mutex_unlock(&los->los_id_lock); + + RETURN(rc); +} + +/* + * Create local named object (file, directory or index) in parent directory. + */ +struct dt_object *__local_file_create(const struct lu_env *env, + const struct lu_fid *fid, + struct local_oid_storage *los, + struct ls_device *ls, + struct dt_object *parent, + const char *name, __u32 mode) +{ + struct dt_thread_info *dti = dt_info(env); + struct dt_object *dto; + struct thandle *th; + int rc; + + dto = ls_locate(env, ls, fid); + if (unlikely(IS_ERR(dto))) + RETURN(dto); + + LASSERT(dto != NULL); + if (dt_object_exists(dto)) + GOTO(out, rc = -EEXIST); + + /* create the object */ + dti->dti_attr.la_valid = LA_MODE | LA_TYPE; + dti->dti_attr.la_mode = mode; + dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT); + + th = dt_trans_create(env, ls->ls_osd); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + rc = local_object_declare_create(env, los, dto, &dti->dti_attr, + &dti->dti_dof, th); + if (rc) + GOTO(trans_stop, rc); + + if (dti->dti_dof.dof_type == DFT_DIR) { + dt_declare_ref_add(env, dto, th); + dt_declare_ref_add(env, parent, th); + } + + rc = dt_declare_insert(env, parent, (void *)fid, (void *)name, th); + if (rc) + GOTO(trans_stop, rc); + + rc = dt_trans_start_local(env, ls->ls_osd, th); + if (rc) + GOTO(trans_stop, rc); + + dt_write_lock(env, dto, 0); + if (dt_object_exists(dto)) + GOTO(unlock, rc = 0); + + CDEBUG(D_OTHER, "create new object "DFID"\n", + PFID(lu_object_fid(&dto->do_lu))); + rc = local_object_create(env, los, dto, &dti->dti_attr, + &dti->dti_dof, th); + if (rc) + GOTO(unlock, rc); + LASSERT(dt_object_exists(dto)); + + if (dti->dti_dof.dof_type == DFT_DIR) { + if (!dt_try_as_dir(env, dto)) + GOTO(destroy, rc = -ENOTDIR); + /* Add "." and ".." for newly created dir */ + rc = dt_insert(env, dto, (void *)fid, (void *)".", th, + BYPASS_CAPA, 1); + if (rc) + GOTO(destroy, rc); + dt_ref_add(env, dto, th); + rc = dt_insert(env, dto, (void *)lu_object_fid(&parent->do_lu), + (void *)"..", th, BYPASS_CAPA, 1); + if (rc) + GOTO(destroy, rc); + } + + dt_write_lock(env, parent, 0); + rc = dt_insert(env, parent, (const struct dt_rec *)fid, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + if (dti->dti_dof.dof_type == DFT_DIR) + dt_ref_add(env, parent, th); + dt_write_unlock(env, parent); + if (rc) + GOTO(destroy, rc); +destroy: + if (rc) + dt_destroy(env, dto, th); +unlock: + dt_write_unlock(env, dto); +trans_stop: + dt_trans_stop(env, ls->ls_osd, th); +out: + if (rc) { + lu_object_put_nocache(env, &dto->do_lu); + dto = ERR_PTR(rc); + } else { + struct lu_fid dti_fid; + /* since local files FIDs are not in OI the directory entry + * is used to get inode number/generation, we need to do lookup + * again to cache this data after create */ + rc = dt_lookup_dir(env, parent, name, &dti_fid); + LASSERT(rc == 0); + } + RETURN(dto); +} + +/* + * Look up and create (if it does not exist) a local named file or directory in + * parent directory. + */ +struct dt_object *local_file_find_or_create(const struct lu_env *env, + struct local_oid_storage *los, + struct dt_object *parent, + const char *name, __u32 mode) +{ + struct dt_thread_info *dti = dt_info(env); + struct dt_object *dto; + int rc; + + LASSERT(parent); + + rc = dt_lookup_dir(env, parent, name, &dti->dti_fid); + if (rc == 0) + /* name is found, get the object */ + dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid); + else if (rc != -ENOENT) + dto = ERR_PTR(rc); + else { + rc = local_object_fid_generate(env, los, &dti->dti_fid); + if (rc < 0) + dto = ERR_PTR(rc); + else + dto = __local_file_create(env, &dti->dti_fid, los, + dt2ls_dev(los->los_dev), + parent, name, mode); + } + return dto; +} +EXPORT_SYMBOL(local_file_find_or_create); + +struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env, + struct dt_device *dt, + const struct lu_fid *fid, + struct dt_object *parent, + const char *name, + __u32 mode) +{ + struct dt_thread_info *dti = dt_info(env); + struct dt_object *dto; + int rc; + + LASSERT(parent); + + rc = dt_lookup_dir(env, parent, name, &dti->dti_fid); + if (rc == 0) { + /* name is found, get the object */ + if (!lu_fid_eq(fid, &dti->dti_fid)) + dto = ERR_PTR(-EINVAL); + else + dto = dt_locate(env, dt, fid); + } else if (rc != -ENOENT) { + dto = ERR_PTR(rc); + } else { + struct ls_device *ls; + + ls = ls_device_get(env, dt); + if (IS_ERR(ls)) + dto = ERR_PTR(PTR_ERR(ls)); + else + dto = __local_file_create(env, fid, NULL, ls, parent, + name, mode); + ls_device_put(env, ls); + } + return dto; +} +EXPORT_SYMBOL(local_file_find_or_create_with_fid); + +static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq) +{ + struct local_oid_storage *los, *ret = NULL; + + cfs_list_for_each_entry(los, &ls->ls_los_list, los_list) { + if (los->los_seq == seq) { + cfs_atomic_inc(&los->los_refcount); + ret = los; + break; + } + } + return ret; +} + +/** + * Initialize local OID storage for required sequence. + * That may be needed for services that uses local files and requires + * dynamic OID allocation for them. + * + * Per each sequence we have an object with 'first_fid' identificator + * containing the counter for OIDs of locally created files with that + * sequence. + * + * It is used now by llog subsystem and MGS for NID tables + * + * Function gets first_fid to create counter object. + * All dynamic fids will be generated with the same sequence and incremented + * OIDs + * + * Returned local_oid_storage is in-memory representaion of OID storage + */ +int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, + const struct lu_fid *first_fid, + struct local_oid_storage **los) +{ + struct dt_thread_info *dti = dt_info(env); + struct ls_device *ls; + struct los_ondisk losd; + struct dt_object *o; + struct dt_object *root = NULL; + struct thandle *th; + int rc; + + ENTRY; + + ls = ls_device_get(env, dev); + if (IS_ERR(ls)) + RETURN(PTR_ERR(ls)); + + cfs_mutex_lock(&ls->ls_los_mutex); + *los = dt_los_find(ls, fid_seq(first_fid)); + if (*los != NULL) + GOTO(out, rc = 0); + + /* not found, then create */ + OBD_ALLOC_PTR(*los); + if (*los == NULL) + GOTO(out, rc = -ENOMEM); + + cfs_atomic_set(&(*los)->los_refcount, 1); + cfs_mutex_init(&(*los)->los_id_lock); + (*los)->los_dev = &ls->ls_top_dev; + cfs_atomic_inc(&ls->ls_refcount); + cfs_list_add(&(*los)->los_list, &ls->ls_los_list); + + /* initialize data allowing to generate new fids, + * literally we need a sequence */ + o = ls_locate(env, ls, first_fid); + if (IS_ERR(o)) + GOTO(out_los, rc = PTR_ERR(o)); + + rc = dt_root_get(env, dev, &dti->dti_fid); + if (rc) + GOTO(out_los, rc); + + root = ls_locate(env, ls, &dti->dti_fid); + if (IS_ERR(root)) + GOTO(out_los, rc = PTR_ERR(root)); + + if (dt_try_as_dir(env, root) == 0) + GOTO(out_los, rc = -ENOTDIR); + + dt_write_lock(env, o, 0); + if (!dt_object_exists(o)) { + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + GOTO(out_lock, rc = PTR_ERR(th)); + + dti->dti_attr.la_valid = LA_MODE | LA_TYPE; + dti->dti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; + dti->dti_dof.dof_type = dt_mode_to_dft(S_IFREG); + + rc = dt_declare_create(env, o, &dti->dti_attr, NULL, + &dti->dti_dof, th); + if (rc) + GOTO(out_trans, rc); + + snprintf(dti->dti_buf, sizeof(dti->dti_buf), + "seq-%Lx-lastid", fid_seq(first_fid)); + rc = dt_declare_insert(env, root, + (const struct dt_rec *)lu_object_fid(&o->do_lu), + (const struct dt_key *)dti->dti_buf, + th); + if (rc) + GOTO(out_trans, rc); + + dti->dti_lb.lb_buf = NULL; + dti->dti_lb.lb_len = sizeof(dti->dti_lma); + rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, + 0, th); + if (rc) + GOTO(out_trans, rc); + + rc = dt_declare_record_write(env, o, sizeof(losd), 0, th); + if (rc) + GOTO(out_trans, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc) + GOTO(out_trans, rc); + + LASSERT(!dt_object_exists(o)); + rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof, th); + if (rc) + GOTO(out_trans, rc); + LASSERT(dt_object_exists(o)); + + lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu)); + lustre_lma_swab(&dti->dti_lma); + dti->dti_lb.lb_buf = &dti->dti_lma; + dti->dti_lb.lb_len = sizeof(dti->dti_lma); + rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, + th, BYPASS_CAPA); + if (rc) + GOTO(out_trans, rc); + + losd.lso_magic = cpu_to_le32(LOS_MAGIC); + losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1); + + dti->dti_off = 0; + dti->dti_lb.lb_buf = &losd; + dti->dti_lb.lb_len = sizeof(losd); + rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th); + if (rc) + GOTO(out_trans, rc); + rc = dt_insert(env, root, + (const struct dt_rec *)lu_object_fid(&o->do_lu), + (const struct dt_key *)dti->dti_buf, th, + BYPASS_CAPA, 1); + if (rc) + GOTO(out_trans, rc); +out_trans: + dt_trans_stop(env, dev, th); + } else { + dti->dti_off = 0; + dti->dti_lb.lb_buf = &losd; + dti->dti_lb.lb_len = sizeof(losd); + rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off); + if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) { + CERROR("local storage file "DFID" is corrupted\n", + PFID(first_fid)); + rc = -EINVAL; + } + } +out_lock: + dt_write_unlock(env, o); +out_los: + if (root) + lu_object_put_nocache(env, &root->do_lu); + if (rc) { + OBD_FREE_PTR(*los); + *los = NULL; + if (o) + lu_object_put_nocache(env, &o->do_lu); + } else { + (*los)->los_seq = fid_seq(first_fid); + (*los)->los_last_oid = le32_to_cpu(losd.lso_next_oid); + (*los)->los_obj = o; + } +out: + cfs_mutex_unlock(&ls->ls_los_mutex); + ls_device_put(env, ls); + return rc; +} +EXPORT_SYMBOL(local_oid_storage_init); + +void local_oid_storage_fini(const struct lu_env *env, + struct local_oid_storage *los) +{ + struct ls_device *ls; + + if (!cfs_atomic_dec_and_test(&los->los_refcount)) + return; + + LASSERT(env); + LASSERT(los->los_dev); + ls = dt2ls_dev(los->los_dev); + + cfs_mutex_lock(&ls->ls_los_mutex); + if (cfs_atomic_read(&los->los_refcount) == 0) { + if (los->los_obj) + lu_object_put_nocache(env, &los->los_obj->do_lu); + cfs_list_del(&los->los_list); + OBD_FREE_PTR(los); + } + cfs_mutex_unlock(&ls->ls_los_mutex); + ls_device_put(env, ls); +} +EXPORT_SYMBOL(local_oid_storage_fini); + diff --git a/lustre/obdclass/local_storage.h b/lustre/obdclass/local_storage.h new file mode 100644 index 0000000..6f801a7 --- /dev/null +++ b/lustre/obdclass/local_storage.h @@ -0,0 +1,75 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2012 Whamcloud, Inc. + */ +/* + * lustre/obdclass/local_storage.c + * + * Local storage for file/objects with fid generation. Works on top of OSD. + * + * Author: Mikhail Pershin + */ + +#define DEBUG_SUBSYSTEM S_CLASS + +#include +#include +#include +#include + +struct ls_device { + struct dt_device ls_top_dev; + /* all initialized ls_devices on this node linked by this */ + cfs_list_t ls_linkage; + /* how many handle's reference this local storage */ + cfs_atomic_t ls_refcount; + /* underlaying OSD device */ + struct dt_device *ls_osd; + /* list of all local OID storages */ + cfs_list_t ls_los_list; + cfs_mutex_t ls_los_mutex; +}; + +static inline struct ls_device *dt2ls_dev(struct dt_device *d) +{ + return container_of0(d, struct ls_device, ls_top_dev); +} + +struct ls_object { + struct lu_object_header ls_header; + struct dt_object ls_obj; +}; + +static inline struct ls_object *lu2ls_obj(struct lu_object *o) +{ + return container_of0(o, struct ls_object, ls_obj.do_lu); +} + +static inline struct dt_object *ls_locate(const struct lu_env *env, + struct ls_device *ls, + const struct lu_fid *fid) +{ + return dt_locate_at(env, ls->ls_osd, fid, &ls->ls_top_dev.dd_lu_dev); +} + + diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index a91b4b6..40bf6ec 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -135,6 +135,18 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) EXPORT_SYMBOL(lu_object_put); /** + * Put object and don't keep in cache. This is temporary solution for + * multi-site objects when its layering is not constant. + */ +void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o) +{ + cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, + &o->lo_header->loh_flags); + return lu_object_put(env, o); +} +EXPORT_SYMBOL(lu_object_put_nocache); + +/** * Allocate new object. * * This follows object creation protocol, described in the comment within diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index cd74171..76a2d19 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -3467,7 +3467,17 @@ struct osd_object *osd_object_find(const struct lu_env *env, struct lu_object *luch; struct lu_object *lo; - luch = lu_object_find(env, ludev, fid, NULL); + /* + * at this point topdev might not exist yet + * (i.e. MGS is preparing profiles). so we can + * not rely on topdev and instead lookup with + * our device passed as topdev. this can't work + * if the object isn't cached yet (as osd doesn't + * allocate lu_header). IOW, the object must be + * in the cache, otherwise lu_object_alloc() crashes + * -bzzz + */ + luch = lu_object_find_at(env, ludev, fid, NULL); if (!IS_ERR(luch)) { if (lu_object_exists(luch)) { lo = lu_object_locate(luch->lo_header, ludev->ld_type); -- 1.8.3.1