Whamcloud - gitweb
b=605627
authorpschwan <pschwan>
Sun, 22 Sep 2002 02:47:48 +0000 (02:47 +0000)
committerpschwan <pschwan>
Sun, 22 Sep 2002 02:47:48 +0000 (02:47 +0000)
Fixes two bugs related to 605627 that have been present since we started
correctly reusing inodes.

- We were giving up locks at file close time, but not throwing away file data
pages.  Fixed, by adding a flag to the blocking callback.  It's now called
under two different circumstances: when a lock needs to be given up
(LDLM_CB_BLOCKING) and when a lock is about to be freed (LDLM_CB_DYING)

- We were not refreshing inode attributes (notably size) correctly.  I brute
force this by always calling ll_file_size() in ll_inode_revalidate, but this
needs some obvious immediate refinement.

As an aside, I noticed that the DLM API documentation gives almost no mention
to the arguments or calling conventions of the callback functions.

lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_lite.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_test.c
lustre/llite/file.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_reint.c

index 7404dda..ad6a93c 100644 (file)
@@ -39,6 +39,9 @@ typedef enum {
 #define LDLM_FL_DESTROYED      (1 << 6)
 #define LDLM_FL_WAIT_NOREPROC  (1 << 7)
 
+#define LDLM_CB_BLOCKING    1
+#define LDLM_CB_DYING       2
+
 #define L2B(c) (1 << c)
 
 /* compatibility matrix */
@@ -112,7 +115,7 @@ struct ldlm_lock;
 
 typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock,
                                       struct ldlm_lock_desc *new, void *data,
-                                      __u32 data_len);
+                                      __u32 data_len, int flag);
 
 typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); 
 
@@ -370,7 +373,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id);
 /* mds/handler.c */
 /* This has to be here because recurisve inclusion sucks. */
 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, __u32 data_len);
+                     void *data, __u32 data_len, int flag);
 
 #endif /* __KERNEL__ */
 
index 1daa659..56a05f9 100644 (file)
@@ -148,7 +148,7 @@ extern struct file_operations ll_file_operations;
 extern struct inode_operations ll_file_inode_operations;
 struct ldlm_lock;
 int ll_lock_callback(struct ldlm_lock *, struct ldlm_lock_desc *, void *data,
-                     __u32 data_len);
+                     __u32 data_len, int flag);
 int ll_size_lock(struct inode *, struct lov_stripe_md *, __u64 start, int mode,
                  struct lustre_handle **);
 int ll_size_unlock(struct inode *, struct lov_stripe_md *, int mode,
index 0b14101..ce54c32 100644 (file)
@@ -152,6 +152,9 @@ void ldlm_lock_put(struct ldlm_lock *lock)
                 LDLM_LOCK_PUT(lock->l_parent);
 
         if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
+                lock->l_blocking_ast(lock, NULL, lock->l_data,
+                                     lock->l_data_len, LDLM_CB_DYING);
+
                 spin_lock(&ns->ns_counter_lock);
                 ns->ns_locks--;
                 spin_unlock(&ns->ns_counter_lock);
@@ -425,7 +428,7 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
 
                 /* FIXME: need a real 'desc' here */
                 lock->l_blocking_ast(lock, NULL, lock->l_data,
-                                     lock->l_data_len);
+                                     lock->l_data_len, LDLM_CB_BLOCKING);
         } else
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
@@ -741,7 +744,7 @@ void ldlm_run_ast_work(struct list_head *rpc_list)
                 if (w->w_blocking)
                         rc = w->w_lock->l_blocking_ast
                                 (w->w_lock, &w->w_desc, w->w_data,
-                                 w->w_datalen);
+                                 w->w_datalen, LDLM_CB_BLOCKING);
                 else
                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
                 if (rc)
index a6ede2f..8d96b50 100644 (file)
@@ -115,13 +115,18 @@ static int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 
 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                                     struct ldlm_lock_desc *desc,
-                                    void *data, __u32 data_len)
+                                    void *data, __u32 data_len, int flag)
 {
         struct ldlm_request *body;
         struct ptlrpc_request *req;
         int rc = 0, size = sizeof(*body);
         ENTRY;
 
+        if (flag == LDLM_CB_DYING) {
+                /* Don't need to do anything when the lock is freed. */
+                RETURN(0);
+        }
+
         req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
                               LDLM_BL_CALLBACK, 1, &size, NULL);
         if (!req)
@@ -391,7 +396,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
                            "callback (%p)", lock->l_blocking_ast);
                 if (lock->l_blocking_ast != NULL) {
                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
-                                             lock->l_data, lock->l_data_len);
+                                             lock->l_data, lock->l_data_len,
+                                             LDLM_CB_BLOCKING);
                 }
         } else
                 LDLM_DEBUG(lock, "Lock still has references, will be"
index 2676301..b5f5e74 100644 (file)
@@ -75,17 +75,26 @@ static int ldlm_do_convert(void);
  */
 static int ldlm_test_blocking_ast(struct ldlm_lock *lock,
                                   struct ldlm_lock_desc *new,
-                                  void *data, __u32 data_len)
+                                  void *data, __u32 data_len, int flag)
 {
         int rc;
         struct lustre_handle lockh;
         ENTRY;
 
-        LDLM_DEBUG(lock, "We're blocking. Cancelling lock");
-        ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
-        if (rc < 0) {
-                CERROR("ldlm_cli_cancel: %d\n", rc);
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                LDLM_DEBUG(lock, "We're blocking. Cancelling lock");
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc < 0) {
+                        CERROR("ldlm_cli_cancel: %d\n", rc);
+                        LBUG();
+                }
+                break;
+        case LDLM_CB_DYING:
+                LDLM_DEBUG(lock, "this lock is being freed");
+                break;
+        default:
                 LBUG();
         }
 
@@ -95,10 +104,11 @@ static int ldlm_test_blocking_ast(struct ldlm_lock *lock,
 /* blocking ast for basic tests. noop */
 static int ldlm_blocking_ast(struct ldlm_lock *lock,
                              struct ldlm_lock_desc *new,
-                             void *data, __u32 data_len)
+                             void *data, __u32 data_len, int flag)
 {
         ENTRY;
-        CERROR("ldlm_blocking_ast: lock=%p, new=%p\n", lock, new);
+        CERROR("ldlm_blocking_ast: lock=%p, new=%p, flag=%d\n", lock, new,
+               flag);
         RETURN(0);
 }
 
@@ -111,13 +121,12 @@ static int ldlm_test_completion_ast(struct ldlm_lock *lock, int flags)
         ENTRY;
 
         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
-                      LDLM_FL_BLOCK_CONV)) {
-
+                     LDLM_FL_BLOCK_CONV)) {
                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock");
                 RETURN(0);
-        } 
+        }
 
-        if (lock->l_granted_mode != lock->l_req_mode) 
+        if (lock->l_granted_mode != lock->l_req_mode)
                 CERROR("completion ast called with non-granted lock\n");
 
         /* add to list of granted locks */
@@ -302,21 +311,21 @@ static int ldlm_do_decrement(void)
         struct ldlm_lock *lock;
         int rc = 0;
         ENTRY;
+
         spin_lock(&ctl_lock);
         if(list_empty(&lock_list)) {
                 CERROR("lock_list is empty\n");
                 spin_unlock(&ctl_lock);
-                RETURN(0); 
-        } 
-                
+                RETURN(0);
+        }
+
         /* delete from list */
-        lock_info = list_entry(lock_list.next, 
+        lock_info = list_entry(lock_list.next,
                         struct ldlm_test_lock, l_link);
         list_del(lock_list.next);
         num_locks--;
         spin_unlock(&ctl_lock);
+
         /* decrement and free the info */
         lock = ldlm_handle2lock(&lock_info->l_lockh);
         ldlm_lock_decref(&lock_info->l_lockh, lock->l_granted_mode);
@@ -328,7 +337,7 @@ static int ldlm_do_decrement(void)
 }
 
 static int ldlm_do_enqueue(struct ldlm_test_thread *thread)
-{                
+{
         struct lustre_handle lockh;
         __u64 res_id[3] = {0};
         __u32 lock_mode;
@@ -349,23 +358,19 @@ static int ldlm_do_enqueue(struct ldlm_test_thread *thread)
         get_random_bytes(&random, sizeof(random));
         ext.start = random % num_extents;
         get_random_bytes(&random, sizeof(random));
-        ext.end = random % 
+        ext.end = random %
                 (num_extents - (int)ext.start) + ext.start;
 
-        LDLM_DEBUG_NOLOCK("about to enqueue with resource %d, mode %d,"
-                          " extent %d -> %d",
-                          (int)res_id[0], 
-                          lock_mode, 
-                          (int)ext.start, 
-                          (int)ext.end);
-
-        rc = ldlm_match_or_enqueue(&regress_connh, 
-                                   NULL, 
-                                   thread->obddev->obd_namespace, 
-                                   NULL, res_id, LDLM_EXTENT, &ext, 
-                                   sizeof(ext), lock_mode, &flags, 
-                                   ldlm_test_completion_ast, 
-                                   ldlm_test_blocking_ast, 
+        LDLM_DEBUG_NOLOCK("about to enqueue with resource "LPX64", mode %d,"
+                          " extent "LPX64" -> "LPX64, res_id[0], lock_mode,
+                          ext.start, ext.end);
+
+        rc = ldlm_match_or_enqueue(&regress_connh, NULL,
+                                   thread->obddev->obd_namespace,
+                                   NULL, res_id, LDLM_EXTENT, &ext,
+                                   sizeof(ext), lock_mode, &flags,
+                                   ldlm_test_completion_ast,
+                                   ldlm_test_blocking_ast,
                                    NULL, 0, &lockh);
 
         atomic_inc(&locks_requested);
@@ -379,7 +384,7 @@ static int ldlm_do_enqueue(struct ldlm_test_thread *thread)
 }
 
 static int ldlm_do_convert(void)
-{                
+{
         __u32 lock_mode;
         unsigned char random;
         int flags = 0, rc = 0;
@@ -408,7 +413,7 @@ static int ldlm_do_convert(void)
         }
 
         /*
-         *  Adjust reference counts. 
+         *  Adjust reference counts.
          *  FIXME: This is technically a bit... wrong,
          *  since we don't know when/if the convert succeeded
          */
@@ -454,7 +459,7 @@ static int ldlm_test_main(void *data)
                  */
                 dec_chance = chance_left * num_locks / max_locks;
                 chance_left -= dec_chance;
+
                 /* FIXME: conversions temporarily disabled
                  * until they are working correctly.
                  */
@@ -483,11 +488,11 @@ static int ldlm_test_main(void *data)
                                   atomic_read(&locks_matched));
 
                 spin_lock(&ctl_lock);
-                LDLM_DEBUG_NOLOCK("lock references currently held: %d, ", 
+                LDLM_DEBUG_NOLOCK("lock references currently held: %d, ",
                                   num_locks);
                 spin_unlock(&ctl_lock);
 
-                /* 
+                /*
                  * We don't sleep after a lock being blocked, so let's
                  * make sure other things can run.
                  */
@@ -531,10 +536,10 @@ static int ldlm_start_thread(struct obd_device *obddev,
         RETURN(0);
 }
 
-int ldlm_regression_start(struct obd_device *obddev, 
-                          struct lustre_handle *connh, 
-                          unsigned int threads, unsigned int max_locks_in, 
-                          unsigned int num_resources_in, 
+int ldlm_regression_start(struct obd_device *obddev,
+                          struct lustre_handle *connh,
+                          unsigned int threads, unsigned int max_locks_in,
+                          unsigned int num_resources_in,
                           unsigned int num_extents_in)
 {
         int i, rc = 0;
@@ -549,7 +554,7 @@ int ldlm_regression_start(struct obd_device *obddev,
         regression_running = 1;
         spin_unlock(&ctl_lock);
 
-        regress_connh = *connh; 
+        regress_connh = *connh;
         max_locks = max_locks_in;
         num_resources = num_resources_in;
         num_extents = num_extents_in;
@@ -601,8 +606,8 @@ int ldlm_regression_stop(void)
         /* decrement all held locks */
         while (!list_empty(&lock_list)) {
                 struct ldlm_lock *lock;
-                struct ldlm_test_lock *lock_info = 
-                       list_entry(lock_list.next, struct ldlm_test_lock, 
+                struct ldlm_test_lock *lock_info =
+                       list_entry(lock_list.next, struct ldlm_test_lock,
                                    l_link);
                 list_del(lock_list.next);
                 num_locks--;
index b5188eb..9183410 100644 (file)
@@ -339,7 +339,7 @@ static void ll_update_atime(struct inode *inode)
 }
 
 int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
-                     void *data, __u32 data_len)
+                     void *data, __u32 data_len, int flag)
 {
         struct inode *inode = data;
         struct lustre_handle lockh;
@@ -351,16 +351,25 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
 
         if (inode == NULL)
                 LBUG();
-        down(&inode->i_sem);
-        CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino);
-        /* FIXME: do something better than throwing away everything */
-        invalidate_inode_pages(inode);
-        up(&inode->i_sem);
-
-        ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
-        if (rc != ELDLM_OK)
-                CERROR("ldlm_cli_cancel failed: %d\n", rc);
+
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
+                break;
+        case LDLM_CB_DYING:
+                down(&inode->i_sem);
+                CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino);
+                /* FIXME: do something better than throwing away everything */
+                invalidate_inode_pages(inode);
+                up(&inode->i_sem);
+                break;
+        default:
+                LBUG();
+        }
+
         RETURN(0);
 }
 
@@ -584,6 +593,17 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
         return 0;
 }
 
+static int ll_inode_revalidate(struct dentry *dentry)
+{
+        struct inode *inode = dentry->d_inode;
+        ENTRY;
+
+        if (!inode)
+                RETURN(0);
+
+        RETURN(ll_file_size(inode, ll_i2info(inode)->lli_smd));
+}
+
 struct file_operations ll_file_operations = {
         read:           ll_file_read,
         write:          ll_file_write,
@@ -596,6 +616,7 @@ struct file_operations ll_file_operations = {
 };
 
 struct inode_operations ll_file_inode_operations = {
-        truncate: ll_truncate,
-        setattr: ll_setattr
+        truncate:   ll_truncate,
+        setattr:    ll_setattr,
+        revalidate: ll_inode_revalidate
 };
index da991bd..1ee50ab 100644 (file)
@@ -152,7 +152,7 @@ int mdc_getattr(struct lustre_handle *conn,
 }
 
 static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                            void *data, __u32 data_len)
+                            void *data, __u32 data_len, int flag)
 {
         int rc;
         struct inode *inode = data;
@@ -166,20 +166,29 @@ static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 RETURN(-EINVAL);
         }
 
-        /* FIXME: do something better than throwing away everything */
-        if (inode == NULL)
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc < 0) {
+                        CERROR("ldlm_cli_cancel: %d\n", rc);
+                        LBUG();
+                }
+                break;
+        case LDLM_CB_DYING:
+                /* FIXME: do something better than throwing away everything */
+                if (inode == NULL)
+                        LBUG();
+                if (S_ISDIR(inode->i_mode)) {
+                        CDEBUG(D_INODE, "invalidating inode %ld\n",
+                               inode->i_ino);
+                        invalidate_inode_pages(inode);
+                }
+                break;
+        default:
                 LBUG();
-        if (S_ISDIR(inode->i_mode)) {
-                CDEBUG(D_INODE, "invalidating inode %ld\n", inode->i_ino);
-                invalidate_inode_pages(inode);
         }
 
-        ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
-        if (rc < 0) {
-                CERROR("ldlm_cli_cancel: %d\n", rc);
-                LBUG();
-        }
         RETURN(0);
 }
 
index 539e50b..0edd444 100644 (file)
@@ -414,11 +414,16 @@ static int mds_getlovinfo(struct ptlrpc_request *req)
 }
 
 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, __u32 data_len)
+                     void *data, __u32 data_len, int flag)
 {
         int do_ast;
         ENTRY;
 
+        if (flag == LDLM_CB_DYING) {
+                /* Don't need to do anything when the lock is freed. */
+                RETURN(0);
+        }
+
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
index 69c1af4..33462f0 100644 (file)
@@ -433,6 +433,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
+#warning FIXME: if mds_name2locked_dentry decrefs this lock, we must not
+        memcpy(&child_lockh, &lockh, sizeof(child_lockh));
         dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
                                         LCK_EX, &child_lockh, lock_mode);