From 544bbd4c7f485b5cc9a0109f6de288772bb5019a Mon Sep 17 00:00:00 2001 From: pschwan Date: Mon, 19 Aug 2002 17:50:18 +0000 Subject: [PATCH] - Maintain a list in the ll_inode_data of data (OST) locks held by this client - in ll_file_release, cancel any remaining locks in that list - refactored mds_getattr_name and mds_getattr into two functions with a common sub-function; this fixed bugs in mds_getattr, and helps prevent them from drifting apart again --- lustre/include/linux/lustre_dlm.h | 3 +- lustre/include/linux/lustre_lite.h | 1 + lustre/ldlm/Makefile.am | 4 +- lustre/{lib => ldlm}/l_lock.c | 0 lustre/ldlm/ldlm_lock.c | 1 + lustre/ldlm/ldlm_lockd.c | 3 + lustre/llite/file.c | 32 ++++++++++- lustre/llite/namei.c | 1 + lustre/llite/super.c | 1 + lustre/mdc/mdc_request.c | 3 +- lustre/mds/handler.c | 111 +++++++++++++++++-------------------- lustre/osc/osc_request.c | 41 +++++++++++--- 12 files changed, 123 insertions(+), 78 deletions(-) rename lustre/{lib => ldlm}/l_lock.c (100%) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index e98475c3..2efb93e 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -111,13 +111,14 @@ typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock, typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); struct ldlm_lock { - __u64 l_random; + __u64 l_random; int l_refc; struct ldlm_resource *l_resource; struct ldlm_lock *l_parent; struct list_head l_children; struct list_head l_childof; struct list_head l_res_link; /*position in one of three res lists*/ + struct list_head l_inode_link; /* position in inode info list */ ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 0685699..04daae4 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -46,6 +46,7 @@ struct ll_inode_info { char *lli_symlink_name; struct lustre_handle lli_intent_lock_handle; struct semaphore lli_open_sem; + struct list_head lli_osc_locks; }; #define LL_SUPER_MAGIC 0x0BD00BD0 diff --git a/lustre/ldlm/Makefile.am b/lustre/ldlm/Makefile.am index d3ef194..aff493a 100644 --- a/lustre/ldlm/Makefile.am +++ b/lustre/ldlm/Makefile.am @@ -8,12 +8,10 @@ MODULE = ldlm modulefs_DATA = ldlm.o EXTRA_PROGRAMS = ldlm -l_lock.c: - test -e l_lock.c || ln -sf $(top_srcdir)/lib/l_lock.c l_net.c: test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c -LINX=l_lock.c l_net.c +LINX=l_net.c ldlm_SOURCES = l_lock.c ldlm_lock.c ldlm_resource.c ldlm_test.c ldlm_lockd.c \ ldlm_extent.c ldlm_request.c l_net.c diff --git a/lustre/lib/l_lock.c b/lustre/ldlm/l_lock.c similarity index 100% rename from lustre/lib/l_lock.c rename to lustre/ldlm/l_lock.c diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 07aab31..80a6fcf 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -225,6 +225,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, lock->l_refc = 1; INIT_LIST_HEAD(&lock->l_children); INIT_LIST_HEAD(&lock->l_res_link); + INIT_LIST_HEAD(&lock->l_inode_link); init_waitqueue_head(&lock->l_waitq); if (parent != NULL) { diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 08d260e..5d0c92f 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -559,6 +559,7 @@ EXPORT_SYMBOL(ldlm_lockname); EXPORT_SYMBOL(ldlm_typename); EXPORT_SYMBOL(ldlm_handle2lock); EXPORT_SYMBOL(ldlm_lock2handle); +EXPORT_SYMBOL(ldlm_lock_put); EXPORT_SYMBOL(ldlm_lock_match); EXPORT_SYMBOL(ldlm_lock_addref); EXPORT_SYMBOL(ldlm_lock_decref); @@ -574,6 +575,8 @@ EXPORT_SYMBOL(ldlm_regression_stop); EXPORT_SYMBOL(ldlm_lock_dump); EXPORT_SYMBOL(ldlm_namespace_new); EXPORT_SYMBOL(ldlm_namespace_free); +EXPORT_SYMBOL(l_lock); +EXPORT_SYMBOL(l_unlock); MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Lock Management Module v0.1"); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index f2bbb07..54c3e57 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -136,6 +136,8 @@ static int ll_file_release(struct inode *inode, struct file *file) struct obdo *oa; struct ll_sb_info *sbi = ll_i2sbi(inode); struct ll_inode_info *lli = ll_i2info(inode); + struct obd_device *obddev = class_conn2obd(&sbi->ll_osc_conn); + struct list_head *tmp, *next; ENTRY; @@ -185,6 +187,23 @@ static int ll_file_release(struct inode *inode, struct file *file) } ptlrpc_free_req(fd->fd_req); + l_lock(&obddev->obd_namespace->ns_lock); + list_for_each_safe(tmp, next, &lli->lli_osc_locks) { + struct ldlm_lock *lock; + struct lustre_handle lockh; + lock = list_entry(tmp, struct ldlm_lock, l_inode_link); + + if (!list_empty(&lock->l_inode_link)) { + list_del_init(&lock->l_inode_link); + LDLM_LOCK_PUT(lock); + } + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) + CERROR("ldlm_cli_cancel: %d\n", rc); + } + l_unlock(&obddev->obd_namespace->ns_lock); + EXIT; out_fd: @@ -236,15 +255,22 @@ static int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, if (data_len != sizeof(struct inode)) LBUG(); - /* FIXME: do something better than throwing away everything */ if (inode == NULL) LBUG(); down(&inode->i_sem); CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino); + /* FIXME: do something better than throwing away everything */ invalidate_inode_pages(inode); up(&inode->i_sem); ldlm_lock2handle(lock, &lockh); + l_lock(&lock->l_resource->lr_namespace->ns_lock); + if (!list_empty(&lock->l_inode_link)) { + list_del_init(&lock->l_inode_link); + LDLM_LOCK_PUT(lock); + } + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + rc = ldlm_cli_cancel(&lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel failed: %d\n", rc); @@ -268,7 +294,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) { OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); if (!lockhs) - RETURN(-ENOMEM); + RETURN(-ENOMEM); extent.start = *ppos; extent.end = *ppos + count; @@ -280,7 +306,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, ll_lock_callback, inode, sizeof(*inode), lockhs); if (err != ELDLM_OK) { - OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + OBD_FREE(lockhs, md->lmd_stripe_count *sizeof(*lockhs)); CERROR("lock enqueue: err: %d\n", err); RETURN(err); } diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index fdae25f..09a4f5a 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -211,6 +211,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, if (err) { CERROR("failure %d inode %Ld\n", err, (long long)ino); ptlrpc_free_req(request); +#warning FIXME: must release lock here RETURN(ERR_PTR(-abs(err))); } } diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 7d2ac6e..bb4edef 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -410,6 +410,7 @@ static void ll_read_inode2(struct inode *inode, void *opaque) ENTRY; sema_init(&ii->lli_open_sem, 1); + INIT_LIST_HEAD(&ii->lli_osc_locks); /* core attributes first */ if (body->valid & OBD_MD_FLID) diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index d303a83..b0508bd 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -228,7 +228,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, struct ldlm_intent *lit; ENTRY; - LDLM_DEBUG_NOLOCK("mdsintent %s dir %ld", ldlm_it2str(it->it_op), dir->i_ino); + LDLM_DEBUG_NOLOCK("mdsintent %s dir %ld", ldlm_it2str(it->it_op), + dir->i_ino); switch (it->it_op) { case IT_MKDIR: diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index dff14c1..07d04b8 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -430,6 +430,52 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(0); } +static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry, + struct ptlrpc_request *req, + int request_off, int reply_off) +{ + struct mds_body *request_body, *body; + struct inode *inode = dentry->d_inode; + int rc; + ENTRY; + + if (inode == NULL) + RETURN(-ENOENT); + + /* Did the client request the link name? */ + request_body = lustre_msg_buf(req->rq_reqmsg, request_off); + body = lustre_msg_buf(req->rq_repmsg, reply_off); + if ((body->valid & OBD_MD_LINKNAME) && S_ISLNK(inode->i_mode)) { + char *tmp = lustre_msg_buf(req->rq_repmsg, reply_off + 1); + + rc = inode->i_op->readlink(dentry, tmp, req->rq_repmsg-> + buflens[reply_off + 1]); + if (rc < 0) { + CERROR("readlink failed: %d\n", rc); + RETURN(rc); + } + + body->valid |= OBD_MD_LINKNAME; + } + + mds_pack_inode2fid(&body->fid1, inode); + mds_pack_inode2body(body, inode); + if (S_ISREG(inode->i_mode)) { + struct lov_stripe_md *md; + + md = lustre_msg_buf(req->rq_repmsg, reply_off + 1); + md->lmd_easize = mds->mds_max_mdsize; + rc = mds_fs_get_md(mds, inode, md); + + if (rc < 0) { + CERROR("mds_fs_get_md failed: %d\n", rc); + RETURN(rc); + } + body->valid |= OBD_MD_FLEASIZE; + } + RETURN(0); +} + static int mds_getattr_name(int offset, struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); @@ -440,7 +486,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) struct inode *dir; struct lustre_handle lockh; char *name; - int namelen, flags, lock_mode, rc = 0; + int namelen, flags, lock_mode, rc = 0, old_offset = offset; __u64 res_id[3] = {0, 0, 0}; ENTRY; @@ -495,26 +541,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) GOTO(out_create_dchild, rc = -ESTALE); } - if (dchild->d_inode) { - struct mds_body *body; - struct inode *inode = dchild->d_inode; - CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n", - dir->i_ino, name, dchild->d_inode->i_ino); - - body = lustre_msg_buf(req->rq_repmsg, offset); - mds_pack_inode2fid(&body->fid1, inode); - mds_pack_inode2body(body, inode); - if (S_ISREG(inode->i_mode)) { - struct lov_stripe_md *md; - md = lustre_msg_buf(req->rq_repmsg, offset + 1); - md->lmd_easize = mds->mds_max_mdsize; - /* FIXME: why don't we check (or use) rc of get_md? */ - mds_fs_get_md(mds, inode, md); - } - /* now a normal case for intent locking */ - rc = 0; - } else - rc = -ENOENT; + rc = mds_getattr_internal(mds, dchild, req, old_offset, offset); EXIT; out_create_dchild: @@ -529,7 +556,6 @@ out_pre_de: return 0; } - static int mds_getattr(int offset, struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); @@ -543,9 +569,8 @@ static int mds_getattr(int offset, struct ptlrpc_request *req) body = lustre_msg_buf(req->rq_reqmsg, offset); push_ctxt(&saved, &mds->mds_ctxt); de = mds_fid2dentry(mds, &body->fid1, NULL); - if (IS_ERR(de)) { + if (IS_ERR(de)) GOTO(out_pop, rc = -ENOENT); - } inode = de->d_inode; if (S_ISREG(body->fid1.f_type)) { @@ -563,43 +588,7 @@ static int mds_getattr(int offset, struct ptlrpc_request *req) GOTO(out, rc); } - if (body->valid & OBD_MD_LINKNAME) { - char *tmp = lustre_msg_buf(req->rq_repmsg, 1); - - rc = inode->i_op->readlink(de, tmp, size[1]); - - if (rc < 0) { - CERROR("readlink failed: %d\n", rc); - GOTO(out, rc); - } - } - - body = lustre_msg_buf(req->rq_repmsg, 0); - body->ino = inode->i_ino; - body->generation = inode->i_generation; - body->atime = inode->i_atime; - body->ctime = inode->i_ctime; - body->mtime = inode->i_mtime; - body->uid = inode->i_uid; - body->gid = inode->i_gid; - body->size = inode->i_size; - body->mode = inode->i_mode; - body->nlink = inode->i_nlink; - body->valid = (OBD_MD_FLID | OBD_MD_FLATIME | OBD_MD_FLCTIME | - OBD_MD_FLMTIME | OBD_MD_FLSIZE | OBD_MD_FLUID | - OBD_MD_FLGID | OBD_MD_FLNLINK | OBD_MD_FLGENER | - OBD_MD_FLMODE); - - if (S_ISREG(inode->i_mode)) { - rc = mds_fs_get_md(mds, inode, - lustre_msg_buf(req->rq_repmsg, 1)); - if (rc < 0) { - CERROR("mds_fs_get_md failed: %d\n", rc); - GOTO(out, rc); - } - body->valid |= OBD_MD_FLEASIZE; - } else if (S_ISLNK(inode->i_mode)) - body->valid |= OBD_MD_LINKNAME; + rc = mds_getattr_internal(mds, de, req, offset, 0); out: l_dput(de); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 152b103..7f7d995 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,6 +27,7 @@ #include #include #include /* for OBD_FAIL_CHECK */ +#include /* for ll_i2info */ static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *md) @@ -592,12 +593,12 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, struct lustre_handle *parent_lock, __u32 type, void *extentp, int extent_len, __u32 mode, int *flags, void *callback, void *data, int datalen, - struct lustre_handle *lockh) + struct lustre_handle *lockhs) { __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id }; struct obd_device *obddev = class_conn2obd(connh); struct ldlm_extent *extent = extentp; - int rc; + int rc, i; __u32 mode2; /* Filesystem locks are given a bit of special treatment: first we @@ -608,7 +609,7 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, /* Next, search for already existing extent locks that will cover us */ //osc_con2dlmcl(conn, &cl, &connection, &rconn); rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent, - sizeof(extent), mode, lockh); + sizeof(extent), mode, lockhs); if (rc == 1) { /* We already have a lock, and it's referenced */ return 0; @@ -624,29 +625,51 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, mode2 = LCK_PW; rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent, - sizeof(extent), mode2, lockh); + sizeof(extent), mode2, lockhs); if (rc == 1) { int flags; /* FIXME: This is not incredibly elegant, but it might * be more elegant than adding another parameter to * lock_match. I want a second opinion. */ - ldlm_lock_addref(lockh, mode); - ldlm_lock_decref(lockh, mode2); + ldlm_lock_addref(lockhs, mode); + ldlm_lock_decref(lockhs, mode2); if (mode == LCK_PR) return 0; - rc = ldlm_cli_convert(lockh, mode, &flags); + rc = ldlm_cli_convert(lockhs, mode, &flags); if (rc) LBUG(); return rc; } - rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace, + rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock, res_id, type, extent, sizeof(extent), mode, flags, ldlm_completion_ast, - callback, data, datalen, lockh); + callback, data, datalen, lockhs); + if (rc) + return rc; + + /* This code must change if we ever stop passing an inode in as data */ + /* This is ldlm and llite code. It makes me sad that it's in + * osc_request.c --phil */ + l_lock(&obddev->obd_namespace->ns_lock); + for (i = 0; i < md->lmd_stripe_count; i++) { + struct ldlm_lock *lock = ldlm_handle2lock(&(lockhs[i])); + struct inode *inode = data; + struct ll_inode_info *lli = ll_i2info(inode); + + if (!lock) { + CERROR("invalid lock in array\n"); + continue; + } + + /* Lock already has an extra ref from handle2lock */ + list_add(&lock->l_inode_link, &lli->lli_osc_locks); + } + l_unlock(&obddev->obd_namespace->ns_lock); + return rc; } -- 1.8.3.1