Whamcloud - gitweb
- Maintain a list in the ll_inode_data of data (OST) locks held by this client
authorpschwan <pschwan>
Mon, 19 Aug 2002 17:50:18 +0000 (17:50 +0000)
committerpschwan <pschwan>
Mon, 19 Aug 2002 17:50:18 +0000 (17:50 +0000)
- in ll_file_release, cancel any remaining locks in that list
- refactored mds_getattr_name and mds_getattr into two functions with a common
  sub-function; this fixed bugs in mds_getattr, and helps prevent them from
  drifting apart again

12 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_lite.h
lustre/ldlm/Makefile.am
lustre/ldlm/l_lock.c [moved from lustre/lib/l_lock.c with 100% similarity]
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/super.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/osc/osc_request.c

index e98475c..2efb93e 100644 (file)
@@ -111,13 +111,14 @@ typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock,
 typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); 
 
 struct ldlm_lock {
-        __u64                  l_random;
+        __u64                 l_random;
         int                   l_refc;
         struct ldlm_resource *l_resource;
         struct ldlm_lock     *l_parent;
         struct list_head      l_children;
         struct list_head      l_childof;
         struct list_head      l_res_link; /*position in one of three res lists*/
+        struct list_head      l_inode_link; /* position in inode info list */
 
         ldlm_mode_t           l_req_mode;
         ldlm_mode_t           l_granted_mode;
index 0685699..04daae4 100644 (file)
@@ -46,6 +46,7 @@ struct ll_inode_info {
         char                 *lli_symlink_name;
         struct lustre_handle  lli_intent_lock_handle;
         struct semaphore      lli_open_sem;
+        struct list_head      lli_osc_locks;
 };
 
 #define LL_SUPER_MAGIC 0x0BD00BD0
index d3ef194..aff493a 100644 (file)
@@ -8,12 +8,10 @@ MODULE = ldlm
 modulefs_DATA = ldlm.o
 EXTRA_PROGRAMS = ldlm
 
-l_lock.c: 
-       test -e l_lock.c || ln -sf $(top_srcdir)/lib/l_lock.c
 l_net.c: 
        test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c
 
-LINX=l_lock.c l_net.c
+LINX=l_net.c
 
 ldlm_SOURCES = l_lock.c ldlm_lock.c ldlm_resource.c ldlm_test.c ldlm_lockd.c \
 ldlm_extent.c ldlm_request.c l_net.c
similarity index 100%
rename from lustre/lib/l_lock.c
rename to lustre/ldlm/l_lock.c
index 07aab31..80a6fcf 100644 (file)
@@ -225,6 +225,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         lock->l_refc = 1;
         INIT_LIST_HEAD(&lock->l_children);
         INIT_LIST_HEAD(&lock->l_res_link);
+        INIT_LIST_HEAD(&lock->l_inode_link);
         init_waitqueue_head(&lock->l_waitq);
 
         if (parent != NULL) {
index 08d260e..5d0c92f 100644 (file)
@@ -559,6 +559,7 @@ EXPORT_SYMBOL(ldlm_lockname);
 EXPORT_SYMBOL(ldlm_typename);
 EXPORT_SYMBOL(ldlm_handle2lock);
 EXPORT_SYMBOL(ldlm_lock2handle);
+EXPORT_SYMBOL(ldlm_lock_put);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_addref);
 EXPORT_SYMBOL(ldlm_lock_decref);
@@ -574,6 +575,8 @@ EXPORT_SYMBOL(ldlm_regression_stop);
 EXPORT_SYMBOL(ldlm_lock_dump);
 EXPORT_SYMBOL(ldlm_namespace_new);
 EXPORT_SYMBOL(ldlm_namespace_free);
+EXPORT_SYMBOL(l_lock);
+EXPORT_SYMBOL(l_unlock);
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
index f2bbb07..54c3e57 100644 (file)
@@ -136,6 +136,8 @@ static int ll_file_release(struct inode *inode, struct file *file)
         struct obdo *oa;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_inode_info *lli = ll_i2info(inode);
+        struct obd_device *obddev = class_conn2obd(&sbi->ll_osc_conn);
+        struct list_head *tmp, *next;
 
         ENTRY;
 
@@ -185,6 +187,23 @@ static int ll_file_release(struct inode *inode, struct file *file)
         }
         ptlrpc_free_req(fd->fd_req);
 
+        l_lock(&obddev->obd_namespace->ns_lock);
+        list_for_each_safe(tmp, next, &lli->lli_osc_locks) {
+                struct ldlm_lock *lock;
+                struct lustre_handle lockh;
+                lock = list_entry(tmp, struct ldlm_lock, l_inode_link);
+
+                if (!list_empty(&lock->l_inode_link)) {
+                        list_del_init(&lock->l_inode_link);
+                        LDLM_LOCK_PUT(lock);
+                }
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc < 0)
+                        CERROR("ldlm_cli_cancel: %d\n", rc);
+        }
+        l_unlock(&obddev->obd_namespace->ns_lock);
+
         EXIT;
 
 out_fd:
@@ -236,15 +255,22 @@ static int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
         if (data_len != sizeof(struct inode))
                 LBUG();
 
-        /* FIXME: do something better than throwing away everything */
         if (inode == NULL)
                 LBUG();
         down(&inode->i_sem);
         CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino);
+        /* FIXME: do something better than throwing away everything */
         invalidate_inode_pages(inode);
         up(&inode->i_sem);
 
         ldlm_lock2handle(lock, &lockh);
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (!list_empty(&lock->l_inode_link)) {
+                list_del_init(&lock->l_inode_link);
+                LDLM_LOCK_PUT(lock);
+        }
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
         rc = ldlm_cli_cancel(&lockh);
         if (rc != ELDLM_OK)
                 CERROR("ldlm_cli_cancel failed: %d\n", rc);
@@ -268,7 +294,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK)) {
                 OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
                 if (!lockhs)
-                        RETURN(-ENOMEM); 
+                        RETURN(-ENOMEM);
 
                 extent.start = *ppos;
                 extent.end = *ppos + count;
@@ -280,7 +306,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
                                   ll_lock_callback, inode, sizeof(*inode),
                                   lockhs);
                 if (err != ELDLM_OK) {
-                        OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs));
+                        OBD_FREE(lockhs, md->lmd_stripe_count *sizeof(*lockhs));
                         CERROR("lock enqueue: err: %d\n", err);
                         RETURN(err);
                 }
index fdae25f..09a4f5a 100644 (file)
@@ -211,6 +211,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
                 if (err) {
                         CERROR("failure %d inode %Ld\n", err, (long long)ino);
                         ptlrpc_free_req(request);
+#warning FIXME: must release lock here
                         RETURN(ERR_PTR(-abs(err)));
                 }
         }
index 7d2ac6e..bb4edef 100644 (file)
@@ -410,6 +410,7 @@ static void ll_read_inode2(struct inode *inode, void *opaque)
         ENTRY;
 
         sema_init(&ii->lli_open_sem, 1);
+        INIT_LIST_HEAD(&ii->lli_osc_locks);
 
         /* core attributes first */
         if (body->valid & OBD_MD_FLID)
index d303a83..b0508bd 100644 (file)
@@ -228,7 +228,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
         struct ldlm_intent *lit;
         ENTRY;
 
-        LDLM_DEBUG_NOLOCK("mdsintent %s dir %ld", ldlm_it2str(it->it_op), dir->i_ino);
+        LDLM_DEBUG_NOLOCK("mdsintent %s dir %ld", ldlm_it2str(it->it_op),
+                          dir->i_ino);
 
         switch (it->it_op) {
         case IT_MKDIR:
index dff14c1..07d04b8 100644 (file)
@@ -430,6 +430,52 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         RETURN(0);
 }
 
+static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry,
+                                struct ptlrpc_request *req,
+                                int request_off, int reply_off)
+{
+        struct mds_body *request_body, *body;
+        struct inode *inode = dentry->d_inode;
+        int rc;
+        ENTRY;
+
+        if (inode == NULL)
+                RETURN(-ENOENT);
+
+        /* Did the client request the link name? */
+        request_body = lustre_msg_buf(req->rq_reqmsg, request_off);
+        body = lustre_msg_buf(req->rq_repmsg, reply_off);
+        if ((body->valid & OBD_MD_LINKNAME) && S_ISLNK(inode->i_mode)) {
+                char *tmp = lustre_msg_buf(req->rq_repmsg, reply_off + 1);
+
+                rc = inode->i_op->readlink(dentry, tmp, req->rq_repmsg->
+                                           buflens[reply_off + 1]);
+                if (rc < 0) {
+                        CERROR("readlink failed: %d\n", rc);
+                        RETURN(rc);
+                }
+
+                body->valid |= OBD_MD_LINKNAME;
+        }
+
+        mds_pack_inode2fid(&body->fid1, inode);
+        mds_pack_inode2body(body, inode);
+        if (S_ISREG(inode->i_mode)) {
+                struct lov_stripe_md *md;
+
+                md = lustre_msg_buf(req->rq_repmsg, reply_off + 1);
+                md->lmd_easize = mds->mds_max_mdsize;
+                rc = mds_fs_get_md(mds, inode, md);
+
+                if (rc < 0) {
+                        CERROR("mds_fs_get_md failed: %d\n", rc);
+                        RETURN(rc);
+                }
+                body->valid |= OBD_MD_FLEASIZE;
+        }
+        RETURN(0);
+}
+
 static int mds_getattr_name(int offset, struct ptlrpc_request *req)
 {
         struct mds_obd *mds = mds_req2mds(req);
@@ -440,7 +486,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
         struct inode *dir;
         struct lustre_handle lockh;
         char *name;
-        int namelen, flags, lock_mode, rc = 0;
+        int namelen, flags, lock_mode, rc = 0, old_offset = offset;
         __u64 res_id[3] = {0, 0, 0};
         ENTRY;
 
@@ -495,26 +541,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
                 GOTO(out_create_dchild, rc = -ESTALE);
         }
 
-        if (dchild->d_inode) {
-                struct mds_body *body;
-                struct inode *inode = dchild->d_inode;
-                CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
-                       dir->i_ino, name, dchild->d_inode->i_ino);
-
-                body = lustre_msg_buf(req->rq_repmsg, offset);
-                mds_pack_inode2fid(&body->fid1, inode);
-                mds_pack_inode2body(body, inode);
-                if (S_ISREG(inode->i_mode)) {
-                        struct lov_stripe_md *md;
-                        md = lustre_msg_buf(req->rq_repmsg, offset + 1);
-                        md->lmd_easize = mds->mds_max_mdsize;
-                        /* FIXME: why don't we check (or use) rc of get_md? */
-                        mds_fs_get_md(mds, inode, md);
-                }
-                /* now a normal case for intent locking */
-                rc = 0;
-        } else
-                rc = -ENOENT;
+        rc = mds_getattr_internal(mds, dchild, req, old_offset, offset);
 
         EXIT;
 out_create_dchild:
@@ -529,7 +556,6 @@ out_pre_de:
         return 0;
 }
 
-
 static int mds_getattr(int offset, struct ptlrpc_request *req)
 {
         struct mds_obd *mds = mds_req2mds(req);
@@ -543,9 +569,8 @@ static int mds_getattr(int offset, struct ptlrpc_request *req)
         body = lustre_msg_buf(req->rq_reqmsg, offset);
         push_ctxt(&saved, &mds->mds_ctxt);
         de = mds_fid2dentry(mds, &body->fid1, NULL);
-        if (IS_ERR(de)) {
+        if (IS_ERR(de))
                 GOTO(out_pop, rc = -ENOENT);
-        }
 
         inode = de->d_inode;
         if (S_ISREG(body->fid1.f_type)) {
@@ -563,43 +588,7 @@ static int mds_getattr(int offset, struct ptlrpc_request *req)
                 GOTO(out, rc);
         }
 
-        if (body->valid & OBD_MD_LINKNAME) {
-                char *tmp = lustre_msg_buf(req->rq_repmsg, 1);
-
-                rc = inode->i_op->readlink(de, tmp, size[1]);
-
-                if (rc < 0) {
-                        CERROR("readlink failed: %d\n", rc);
-                        GOTO(out, rc);
-                }
-        }
-
-        body = lustre_msg_buf(req->rq_repmsg, 0);
-        body->ino = inode->i_ino;
-        body->generation = inode->i_generation;
-        body->atime = inode->i_atime;
-        body->ctime = inode->i_ctime;
-        body->mtime = inode->i_mtime;
-        body->uid = inode->i_uid;
-        body->gid = inode->i_gid;
-        body->size = inode->i_size;
-        body->mode = inode->i_mode;
-        body->nlink = inode->i_nlink;
-        body->valid = (OBD_MD_FLID | OBD_MD_FLATIME | OBD_MD_FLCTIME |
-                       OBD_MD_FLMTIME | OBD_MD_FLSIZE | OBD_MD_FLUID |
-                       OBD_MD_FLGID | OBD_MD_FLNLINK | OBD_MD_FLGENER |
-                       OBD_MD_FLMODE);
-
-        if (S_ISREG(inode->i_mode)) {
-                rc = mds_fs_get_md(mds, inode,
-                                   lustre_msg_buf(req->rq_repmsg, 1));
-                if (rc < 0) {
-                        CERROR("mds_fs_get_md failed: %d\n", rc);
-                        GOTO(out, rc);
-                }
-                body->valid |= OBD_MD_FLEASIZE;
-        } else if (S_ISLNK(inode->i_mode))
-                body->valid |= OBD_MD_LINKNAME;
+        rc = mds_getattr_internal(mds, de, req, offset, 0);
 
 out:
         l_dput(de);
index 152b103..7f7d995 100644 (file)
@@ -27,6 +27,7 @@
 #include <linux/init.h>
 #include <linux/lustre_ha.h>
 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
+#include <linux/lustre_lite.h> /* for ll_i2info */
 
 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
                        struct lov_stripe_md *md)
@@ -592,12 +593,12 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
                        struct lustre_handle *parent_lock, 
                        __u32 type, void *extentp, int extent_len, __u32 mode,
                        int *flags, void *callback, void *data, int datalen,
-                       struct lustre_handle *lockh)
+                       struct lustre_handle *lockhs)
 {
         __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
         struct obd_device *obddev = class_conn2obd(connh);
         struct ldlm_extent *extent = extentp;
-        int rc;
+        int rc, i;
         __u32 mode2;
 
         /* Filesystem locks are given a bit of special treatment: first we
@@ -608,7 +609,7 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
         /* Next, search for already existing extent locks that will cover us */
         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
-                             sizeof(extent), mode, lockh);
+                             sizeof(extent), mode, lockhs);
         if (rc == 1) {
                 /* We already have a lock, and it's referenced */
                 return 0;
@@ -624,29 +625,51 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
                 mode2 = LCK_PW;
 
         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
-                             sizeof(extent), mode2, lockh);
+                             sizeof(extent), mode2, lockhs);
         if (rc == 1) {
                 int flags;
                 /* FIXME: This is not incredibly elegant, but it might
                  * be more elegant than adding another parameter to
                  * lock_match.  I want a second opinion. */
-                ldlm_lock_addref(lockh, mode);
-                ldlm_lock_decref(lockh, mode2);
+                ldlm_lock_addref(lockhs, mode);
+                ldlm_lock_decref(lockhs, mode2);
 
                 if (mode == LCK_PR)
                         return 0;
 
-                rc = ldlm_cli_convert(lockh, mode, &flags);
+                rc = ldlm_cli_convert(lockhs, mode, &flags);
                 if (rc)
                         LBUG();
 
                 return rc;
         }
 
-        rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
+        rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
                               parent_lock, res_id, type, extent,
                               sizeof(extent), mode, flags, ldlm_completion_ast,
-                              callback, data, datalen, lockh);
+                              callback, data, datalen, lockhs);
+        if (rc)
+                return rc;
+
+        /* This code must change if we ever stop passing an inode in as data */
+        /* This is ldlm and llite code.  It makes me sad that it's in
+         * osc_request.c --phil */
+        l_lock(&obddev->obd_namespace->ns_lock);
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                struct ldlm_lock *lock = ldlm_handle2lock(&(lockhs[i]));
+                struct inode *inode = data;
+                struct ll_inode_info *lli = ll_i2info(inode);
+
+                if (!lock) {
+                        CERROR("invalid lock in array\n");
+                        continue;
+                }
+
+                /* Lock already has an extra ref from handle2lock */
+                list_add(&lock->l_inode_link, &lli->lli_osc_locks);
+        }
+        l_unlock(&obddev->obd_namespace->ns_lock);
+
         return rc;
 }