implementation was buggy and did not take into account the case when inode is just
released due to memory pressure and only removed lmv_obj when inode was deleted.
Current implementation is much better especially if some client will produce lots
of splitted dirs.
- cleanups struct lmv_obj. Renames in lmv obj manager to make names shorter.
- making lmv_obj manager to use kmem cache for struct lmv_obj instances. This
should work faster if some client will produce lots of splitted dirs.
- fix in lmv_connect(). It should not remove lmv_proc_dir if thereis no error.
- added some asserts. Fixes in lmv_obj manager to make it working correct if
an object is getting removed by lmv_delete_obj().
- removed error message when lmv_put_inode() detects, that object is still used by
someone, removed redundant assignments and variables. Cleanups about using lustre_id
in lmv_obj manager.
- cleanups about ENTRY/RETURN things in lmv object manager.
int repoff);
int (*m_set_lock_data)(struct obd_export *exp, __u64 *l, void *data);
- int (*m_delete_object)(struct obd_export *, struct lustre_id *);
+ int (*m_put_inode)(struct obd_export *, struct lustre_id *);
/* NOTE: If adding ops, add another LPROCFS_OBD_OP_INIT() line to
* lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c. Also, add a
RETURN(rc);
}
-/* this function notifies MDC, that inode described by @id gets removed from
- * memory.*/
-static inline int md_delete_object(struct obd_export *exp,
- struct lustre_id *id)
+static inline int md_put_inode(struct obd_export *exp,
+ struct lustre_id *id)
{
int rc;
ENTRY;
- /* as this method only notifies MDC that inode gets deleted, we can
- * return zero if method is not implemented, this means, that OBD does
- * not need such a notification. */
- if (MDP(exp->exp_obd, delete_object) == NULL)
+ if (MDP(exp->exp_obd, put_inode) == NULL)
RETURN(0);
- MD_COUNTER_INCREMENT(exp->exp_obd, delete_object);
- rc = MDP(exp->exp_obd, delete_object)(exp, id);
+ MD_COUNTER_INCREMENT(exp->exp_obd, put_inode);
+ rc = MDP(exp->exp_obd, put_inode)(exp, id);
RETURN(rc);
}
int it_disposition(struct lookup_intent *it, int flag);
void it_set_disposition(struct lookup_intent *it, int flag);
void ll_read_inode2(struct inode *inode, void *opaque);
-void ll_delete_inode(struct inode *inode);
+void ll_put_inode(struct inode *inode);
int ll_iocontrol(struct inode *inode, struct file *file,
unsigned int cmd, unsigned long arg);
void ll_umount_begin(struct super_block *sb);
}
}
-void ll_delete_inode(struct inode *inode)
+void ll_put_inode(struct inode *inode)
{
- int rc;
- struct lustre_id id;
struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct lustre_id id;
+ int rc;
ENTRY;
-
- ll_inode2id(&id, inode);
- rc = md_delete_object(sbi->ll_lmv_exp, &id);
- if (rc) {
- CERROR("md_delete_object() failed, error %d.\n",
- rc);
+ /* notifying metadata target of putting inode. */
+ if (atomic_read(&inode->i_count) == 1) {
+ ll_inode2id(&id, inode);
+
+ rc = md_put_inode(sbi->ll_lmv_exp, &id);
+ if (rc) {
+ CERROR("md_put_inode() failed, "
+ "error %d.\n", rc);
+ }
}
- clear_inode(inode);
EXIT;
}
{
.read_inode2 = ll_read_inode2,
.clear_inode = ll_clear_inode,
-#if 0
- /* should be fixed first */
- .delete_inode = ll_delete_inode,
-#endif
+ .put_inode = ll_put_inode,
.put_super = lustre_put_super,
.statfs = ll_statfs,
.umount_begin = ll_umount_begin,
int state; /* object state. */
atomic_t count; /* ref counter. */
struct lustre_id id; /* master id of dir */
- void *update; /* bitmap of status (uptodate) */
+ void *update; /* bitmap of status (up-to-date) */
__u32 hashtype;
int objcount; /* number of slaves */
struct lmv_inode *objs; /* array of dirobjs */
struct obd_device *obd; /* pointer to LMV itself */
- unsigned long mtime;
- unsigned long ctime;
- unsigned long atime;
- unsigned long nlink;
};
static inline void
#include <linux/obd_lmv.h>
#include "lmv_internal.h"
+/* object cache. */
+kmem_cache_t *obj_cache;
+atomic_t obj_cache_count = ATOMIC_INIT(0);
+
static void lmv_activate_target(struct lmv_obd *lmv,
struct lmv_tgt_desc *tgt,
int activate)
lprocfs_init_vars(lmv, &lvars);
rc = lprocfs_obd_attach(dev, lvars.obd_vars);
- if (rc == 0) {
#ifdef __KERNEL__
+ if (rc == 0) {
struct proc_dir_entry *entry;
entry = create_proc_entry("target_obd_status", 0444,
RETURN(-ENOMEM);
entry->proc_fops = &lmv_proc_target_fops;
entry->data = dev;
-#endif
}
+#endif
RETURN (rc);
}
rc = lmv_check_connect(obd);
#ifdef __KERNEL__
- if (lmv_proc_dir)
- lprocfs_remove(lmv_proc_dir);
+ if (rc) {
+ if (lmv_proc_dir)
+ lprocfs_remove(lmv_proc_dir);
+ }
#endif
RETURN(rc);
struct obd_device *obddev = class_exp2obd(exp);
struct lmv_obd *lmv = &obddev->u.lmv;
int i, rc = 0, set = 0;
-
ENTRY;
if (lmv->desc.ld_tgt_count == 0)
RETURN(rc);
}
-int lmv_delete_object(struct obd_export *exp, struct lustre_id *id)
+int lmv_put_inode(struct obd_export *exp, struct lustre_id *id)
{
ENTRY;
};
struct md_ops lmv_md_ops = {
- .m_getstatus = lmv_getstatus,
- .m_getattr = lmv_getattr,
- .m_change_cbdata = lmv_change_cbdata,
- .m_change_cbdata_name = lmv_change_cbdata_name,
- .m_close = lmv_close,
- .m_create = lmv_create,
- .m_done_writing = lmv_done_writing,
- .m_enqueue = lmv_enqueue,
- .m_getattr_lock = lmv_getattr_lock,
- .m_intent_lock = lmv_intent_lock,
- .m_link = lmv_link,
- .m_rename = lmv_rename,
- .m_setattr = lmv_setattr,
- .m_sync = lmv_sync,
- .m_readpage = lmv_readpage,
- .m_unlink = lmv_unlink,
- .m_get_real_obd = lmv_get_real_obd,
- .m_valid_attrs = lmv_valid_attrs,
- .m_delete_object = lmv_delete_object,
+ .m_getstatus = lmv_getstatus,
+ .m_getattr = lmv_getattr,
+ .m_change_cbdata = lmv_change_cbdata,
+ .m_change_cbdata_name = lmv_change_cbdata_name,
+ .m_close = lmv_close,
+ .m_create = lmv_create,
+ .m_done_writing = lmv_done_writing,
+ .m_enqueue = lmv_enqueue,
+ .m_getattr_lock = lmv_getattr_lock,
+ .m_intent_lock = lmv_intent_lock,
+ .m_link = lmv_link,
+ .m_rename = lmv_rename,
+ .m_setattr = lmv_setattr,
+ .m_sync = lmv_sync,
+ .m_readpage = lmv_readpage,
+ .m_unlink = lmv_unlink,
+ .m_get_real_obd = lmv_get_real_obd,
+ .m_valid_attrs = lmv_valid_attrs,
+ .m_put_inode = lmv_put_inode,
};
int __init lmv_init(void)
struct lprocfs_static_vars lvars;
int rc;
+ obj_cache = kmem_cache_create("lmv_objects",
+ sizeof(struct lmv_obj),
+ 0, 0, NULL, NULL);
+ if (!obj_cache) {
+ CERROR("error allocating lmv objects cache\n");
+ return -ENOMEM;
+ }
+
lprocfs_init_vars(lmv, &lvars);
rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
- lvars.module_vars, OBD_LMV_DEVICENAME);
- RETURN(rc);
+ lvars.module_vars,
+ OBD_LMV_DEVICENAME);
+ return rc;
}
#ifdef __KERNEL__
static void lmv_exit(void)
{
class_unregister_type(OBD_LMV_DEVICENAME);
+
+ LASSERTF(kmem_cache_destroy(obj_cache) == 0,
+ "can't free lmv objects cache, %d object(s)"
+ "still in use\n", atomic_read(&obj_cache_count));
}
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
#include <linux/obd_lmv.h>
#include "lmv_internal.h"
-static LIST_HEAD(lmv_obj_list);
-static spinlock_t lmv_obj_list_lock = SPIN_LOCK_UNLOCKED;
+/* objects cache. */
+extern kmem_cache_t *obj_cache;
+extern atomic_t obj_cache_count;
+
+/* object list and its guard. */
+static LIST_HEAD(obj_list);
+static spinlock_t obj_list_lock = SPIN_LOCK_UNLOCKED;
/* creates new obj on passed @id and @mea. */
struct lmv_obj *
LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR
|| mea->mea_magic == MEA_MAGIC_ALL_CHARS);
- OBD_ALLOC(obj, sizeof(*obj));
+ OBD_SLAB_ALLOC(obj, obj_cache, GFP_NOFS, sizeof(*obj));
if (!obj)
return NULL;
+ atomic_inc(&obj_cache_count);
+
obj->id = *id;
obj->obd = obd;
obj->state = 0;
CDEBUG(D_OTHER, "subobj "DLID4"\n",
OLID4(&mea->mea_ids[i]));
obj->objs[i].id = mea->mea_ids[i];
+ LASSERT(id_ino(&obj->objs[i].id));
LASSERT(id_fid(&obj->objs[i].id));
}
unsigned int obj_size;
struct lmv_obd *lmv = &obj->obd->u.lmv;
+ LASSERT(!atomic_read(&obj->count));
+
obj_size = sizeof(struct lmv_inode) *
lmv->desc.ld_tgt_count;
OBD_FREE(obj->objs, obj_size);
- OBD_FREE(obj, sizeof(*obj));
+ OBD_SLAB_FREE(obj, obj_cache, sizeof(*obj));
+ atomic_dec(&obj_cache_count);
}
static void
__add_obj(struct lmv_obj *obj)
{
atomic_inc(&obj->count);
- list_add(&obj->list, &lmv_obj_list);
+ list_add(&obj->list, &obj_list);
}
void
lmv_add_obj(struct lmv_obj *obj)
{
- spin_lock(&lmv_obj_list_lock);
+ spin_lock(&obj_list_lock);
__add_obj(obj);
- spin_unlock(&lmv_obj_list_lock);
+ spin_unlock(&obj_list_lock);
}
static void
__del_obj(struct lmv_obj *obj)
{
- if (!(obj->state & O_FREEING))
- LBUG();
-
list_del(&obj->list);
lmv_free_obj(obj);
}
void
lmv_del_obj(struct lmv_obj *obj)
{
- spin_lock(&lmv_obj_list_lock);
+ spin_lock(&obj_list_lock);
__del_obj(obj);
- spin_unlock(&lmv_obj_list_lock);
+ spin_unlock(&obj_list_lock);
}
static struct lmv_obj *
__get_obj(struct lmv_obj *obj)
{
- LASSERT(obj);
+ LASSERT(obj != NULL);
atomic_inc(&obj->count);
return obj;
}
struct lmv_obj *
lmv_get_obj(struct lmv_obj *obj)
{
- spin_lock(&lmv_obj_list_lock);
+ spin_lock(&obj_list_lock);
__get_obj(obj);
- spin_unlock(&lmv_obj_list_lock);
-
+ spin_unlock(&obj_list_lock);
return obj;
}
void
lmv_put_obj(struct lmv_obj *obj)
{
- spin_lock(&lmv_obj_list_lock);
+ spin_lock(&obj_list_lock);
__put_obj(obj);
- spin_unlock(&lmv_obj_list_lock);
+ spin_unlock(&obj_list_lock);
}
static struct lmv_obj *
struct lmv_obj *obj;
struct list_head *cur;
- list_for_each(cur, &lmv_obj_list) {
+ list_for_each(cur, &obj_list) {
obj = list_entry(cur, struct lmv_obj, list);
/* check if object is in progress of destroying. If so - skip
struct lmv_obj *obj;
ENTRY;
- spin_lock(&lmv_obj_list_lock);
+ spin_lock(&obj_list_lock);
obj = __grab_obj(obd, id);
- spin_unlock(&lmv_obj_list_lock);
+ spin_unlock(&obj_list_lock);
RETURN(obj);
}
/* check if someone create it already while we were dealing with
* allocating @obj. */
- spin_lock(&lmv_obj_list_lock);
+ spin_lock(&obj_list_lock);
obj = __grab_obj(obd, id);
if (obj) {
/* someone created it already - put @obj and getting out. */
lmv_free_obj(new);
- spin_unlock(&lmv_obj_list_lock);
+ spin_unlock(&obj_list_lock);
RETURN(obj);
}
__add_obj(new);
__get_obj(new);
- spin_unlock(&lmv_obj_list_lock);
+ spin_unlock(&obj_list_lock);
CDEBUG(D_OTHER, "new obj in lmv cache: "DLID4"\n",
OLID4(id));
struct ptlrpc_request *req = NULL;
struct lmv_obj *obj;
struct lustre_md md;
- int mealen, i, rc;
+ int mealen, rc;
ENTRY;
CDEBUG(D_OTHER, "get mea for "DLID4" and create lmv obj\n",
mealen = MEA_SIZE_LMV(lmv);
/* time to update mea of parent id */
- i = id->li_fid.lf_group;
md.mea = NULL;
-
valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
- rc = md_getattr(lmv->tgts[id->li_fid.lf_group].ltd_exp,
+
+ rc = md_getattr(lmv->tgts[id_group(id)].ltd_exp,
id, valid, mealen, &req);
if (rc) {
CERROR("md_getattr() failed, error %d\n", rc);
OLID4(id));
GOTO(cleanup, obj = ERR_PTR(-ENOMEM));
}
+ EXIT;
cleanup:
- if (req)
+ if (req)
ptlrpc_req_finished(req);
- RETURN(obj);
+ return obj;
}
/* looks for object with @id and orders to destroy it. It is possible the
int rc = 0;
ENTRY;
- spin_lock(&lmv_obj_list_lock);
-
+ spin_lock(&obj_list_lock);
obj = __grab_obj(obd, id);
if (obj) {
obj->state |= O_FREEING;
-
- if (atomic_read(&obj->count) > 1)
- CERROR("obj "DLID4" has count > 2 (%d)\n",
- OLID4(&obj->id), atomic_read(&obj->count));
__put_obj(obj);
__put_obj(obj);
rc = 1;
}
- spin_unlock(&lmv_obj_list_lock);
+ spin_unlock(&obj_list_lock);
RETURN(rc);
}
int
lmv_setup_mgr(struct obd_device *obd)
{
+ LASSERT(obd != NULL);
+
CDEBUG(D_INFO, "LMV object manager setup (%s)\n",
obd->obd_uuid.uuid);
+
return 0;
}
CDEBUG(D_INFO, "LMV object manager cleanup (%s)\n",
obd->obd_uuid.uuid);
- spin_lock(&lmv_obj_list_lock);
- list_for_each_safe(cur, tmp, &lmv_obj_list) {
+ spin_lock(&obj_list_lock);
+ list_for_each_safe(cur, tmp, &obj_list) {
obj = list_entry(cur, struct lmv_obj, list);
if (obj->obd != obd)
continue;
obj->state |= O_FREEING;
- if (atomic_read(&obj->count) > 1)
+ if (atomic_read(&obj->count) > 1) {
CERROR("obj "DLID4" has count > 1 (%d)\n",
OLID4(&obj->id), atomic_read(&obj->count));
+ }
__put_obj(obj);
}
- spin_unlock(&lmv_obj_list_lock);
+ spin_unlock(&obj_list_lock);
}
return rc;
}
-static int mds_connect_post(struct obd_export *exp, unsigned long connect_flags)
+static int mds_connect_post(struct obd_export *exp, unsigned long flags)
{
struct obd_device *obd = exp->exp_obd;
struct mds_obd *mds = &obd->u.mds;
int rc = 0;
ENTRY;
- if (!(connect_flags & OBD_OPT_MDS_CONNECTION)) {
+ if (!(flags & OBD_OPT_MDS_CONNECTION)) {
if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)) {
atomic_inc(&mds->mds_real_clients);
CDEBUG(D_OTHER,"%s: peer from %s is real client (%d)\n",