Whamcloud - gitweb
The old patch was failing because sometimes the inodes were gone by the time
authorpschwan <pschwan>
Tue, 24 Sep 2002 20:04:00 +0000 (20:04 +0000)
committerpschwan <pschwan>
Tue, 24 Sep 2002 20:04:00 +0000 (20:04 +0000)
the callback was called with CB_DYING.  I also realized that the client would
believe that it had valid data for too long.

Instead of an extra callback when the lock is freed, I moved that extra
callback to just before we cancel the lock.  Because there's two paths for that
now (cancelling with server notification and cancelling without), I had to
introduce _another_ l_flag, which indicates whether we've done this call.  Oh
well.  It works much better now.

lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_lite.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_test.c
lustre/llite/file.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c

index 1efcdca..9f6c747 100644 (file)
@@ -38,6 +38,10 @@ typedef enum {
 #define LDLM_FL_AST_SENT       (1 << 5)
 #define LDLM_FL_DESTROYED      (1 << 6)
 #define LDLM_FL_WAIT_NOREPROC  (1 << 7)
+#define LDLM_FL_CANCEL         (1 << 8)
+
+#define LDLM_CB_BLOCKING    1
+#define LDLM_CB_CANCELING   2
 
 #define L2B(c) (1 << c)
 
@@ -112,7 +116,7 @@ struct ldlm_lock;
 
 typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock,
                                       struct ldlm_lock_desc *new, void *data,
-                                      __u32 data_len);
+                                      __u32 data_len, int flag);
 
 typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); 
 
@@ -259,6 +263,7 @@ void ldlm_unregister_intent(void);
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
 struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle);
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
+void ldlm_cancel_callback(struct ldlm_lock *lock);
 
 #define LDLM_LOCK_PUT(lock)                     \
 do {                                            \
@@ -370,7 +375,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *, __u64 *, int local_only);
 /* mds/handler.c */
 /* This has to be here because recurisve inclusion sucks. */
 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, __u32 data_len);
+                     void *data, __u32 data_len, int flag);
 
 #endif /* __KERNEL__ */
 
index c853fee..23912d5 100644 (file)
@@ -148,7 +148,7 @@ extern struct file_operations ll_file_operations;
 extern struct inode_operations ll_file_inode_operations;
 struct ldlm_lock;
 int ll_lock_callback(struct ldlm_lock *, struct ldlm_lock_desc *, void *data,
-                     __u32 data_len);
+                     __u32 data_len, int flag);
 int ll_size_lock(struct inode *, struct lov_stripe_md *, __u64 start, int mode,
                  struct lustre_handle **);
 int ll_size_unlock(struct inode *, struct lov_stripe_md *, int mode,
index 0b14101..d3e2f47 100644 (file)
@@ -425,7 +425,7 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
 
                 /* FIXME: need a real 'desc' here */
                 lock->l_blocking_ast(lock, NULL, lock->l_data,
-                                     lock->l_data_len);
+                                     lock->l_data_len, LDLM_CB_BLOCKING);
         } else
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
@@ -741,7 +741,7 @@ void ldlm_run_ast_work(struct list_head *rpc_list)
                 if (w->w_blocking)
                         rc = w->w_lock->l_blocking_ast
                                 (w->w_lock, &w->w_desc, w->w_data,
-                                 w->w_datalen);
+                                 w->w_datalen, LDLM_CB_BLOCKING);
                 else
                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
                 if (rc)
@@ -780,6 +780,17 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
         EXIT;
 }
 
+void ldlm_cancel_callback(struct ldlm_lock *lock)
+{
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (!(lock->l_flags & LDLM_FL_CANCEL)) {
+                lock->l_flags |= LDLM_FL_CANCEL;
+                lock->l_blocking_ast(lock, NULL, lock->l_data,
+                                     lock->l_data_len, LDLM_CB_CANCELING);
+        }
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+}
+
 void ldlm_lock_cancel(struct ldlm_lock *lock)
 {
         struct ldlm_resource *res;
@@ -794,6 +805,8 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
                        "writers)\n", lock->l_readers, lock->l_writers);
 
+        ldlm_cancel_callback(lock);
+
         ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy(lock);
         l_unlock(&ns->ns_lock);
index a6ede2f..a370571 100644 (file)
@@ -115,13 +115,18 @@ static int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 
 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                                     struct ldlm_lock_desc *desc,
-                                    void *data, __u32 data_len)
+                                    void *data, __u32 data_len, int flag)
 {
         struct ldlm_request *body;
         struct ptlrpc_request *req;
         int rc = 0, size = sizeof(*body);
         ENTRY;
 
+        if (flag == LDLM_CB_CANCELING) {
+                /* Don't need to do anything here. */
+                RETURN(0);
+        }
+
         req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
                               LDLM_BL_CALLBACK, 1, &size, NULL);
         if (!req)
@@ -391,7 +396,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
                            "callback (%p)", lock->l_blocking_ast);
                 if (lock->l_blocking_ast != NULL) {
                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
-                                             lock->l_data, lock->l_data_len);
+                                             lock->l_data, lock->l_data_len,
+                                             LDLM_CB_BLOCKING);
                 }
         } else
                 LDLM_DEBUG(lock, "Lock still has references, will be"
index bdde5c4..ee4594a 100644 (file)
@@ -363,6 +363,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 /* Set this flag to prevent others from getting new references*/
                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 lock->l_flags |= LDLM_FL_CBPENDING;
+                ldlm_cancel_callback(lock);
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
                 req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), 
index d445cad..d28558f 100644 (file)
@@ -75,17 +75,26 @@ static int ldlm_do_convert(void);
  */
 static int ldlm_test_blocking_ast(struct ldlm_lock *lock,
                                   struct ldlm_lock_desc *new,
-                                  void *data, __u32 data_len)
+                                  void *data, __u32 data_len, int flag)
 {
         int rc;
         struct lustre_handle lockh;
         ENTRY;
 
-        LDLM_DEBUG(lock, "We're blocking. Cancelling lock");
-        ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
-        if (rc < 0) {
-                CERROR("ldlm_cli_cancel: %d\n", rc);
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                LDLM_DEBUG(lock, "We're blocking. Cancelling lock");
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc < 0) {
+                        CERROR("ldlm_cli_cancel: %d\n", rc);
+                        LBUG();
+                }
+                break;
+        case LDLM_CB_CANCELING:
+                LDLM_DEBUG(lock, "this lock is being cancelled");
+                break;
+        default:
                 LBUG();
         }
 
@@ -95,10 +104,11 @@ static int ldlm_test_blocking_ast(struct ldlm_lock *lock,
 /* blocking ast for basic tests. noop */
 static int ldlm_blocking_ast(struct ldlm_lock *lock,
                              struct ldlm_lock_desc *new,
-                             void *data, __u32 data_len)
+                             void *data, __u32 data_len, int flag)
 {
         ENTRY;
-        CERROR("ldlm_blocking_ast: lock=%p, new=%p\n", lock, new);
+        CERROR("ldlm_blocking_ast: lock=%p, new=%p, flag=%d\n", lock, new,
+               flag);
         RETURN(0);
 }
 
index fc19186..6548237 100644 (file)
@@ -340,7 +340,7 @@ static void ll_update_atime(struct inode *inode)
 }
 
 int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
-                     void *data, __u32 data_len)
+                     void *data, __u32 data_len, int flag)
 {
         struct inode *inode = data;
         struct lustre_handle lockh;
@@ -353,16 +353,24 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
         if (inode == NULL)
                 LBUG();
 
-        down(&inode->i_sem);
-        CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino);
-        /* FIXME: do something better than throwing away everything */
-        invalidate_inode_pages(inode);
-        up(&inode->i_sem);
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
+                break;
+        case LDLM_CB_CANCELING:
+                down(&inode->i_sem);
+                CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino);
+                /* FIXME: do something better than throwing away everything */
+                invalidate_inode_pages(inode);
+                up(&inode->i_sem);
+                break;
+        default:
+                LBUG();
+        }
 
-        ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
-        if (rc != ELDLM_OK)
-                CERROR("ldlm_cli_cancel failed: %d\n", rc);
         RETURN(0);
 }
 
index 4b2ab91..da291bc 100644 (file)
@@ -153,7 +153,7 @@ int mdc_getattr(struct lustre_handle *conn,
 }
 
 static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                            void *data, __u32 data_len)
+                            void *data, __u32 data_len, int flag)
 {
         int rc;
         struct inode *inode = data;
@@ -167,22 +167,29 @@ static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 RETURN(-EINVAL);
         }
 
-        if (inode == NULL)
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc < 0) {
+                        CERROR("ldlm_cli_cancel: %d\n", rc);
+                        LBUG();
+                }
+                break;
+        case LDLM_CB_CANCELING:
+                /* FIXME: do something better than throwing away everything */
+                if (inode == NULL)
+                        LBUG();
+                if (S_ISDIR(inode->i_mode)) {
+                        CDEBUG(D_INODE, "invalidating inode %ld\n",
+                               inode->i_ino);
+                        invalidate_inode_pages(inode);
+                }
+                break;
+        default:
                 LBUG();
-
-        /* FIXME: do something better than throwing away everything */
-        if (S_ISDIR(inode->i_mode)) {
-                CDEBUG(D_INODE, "invalidating inode %ld\n",
-                       inode->i_ino);
-                invalidate_inode_pages(inode);
         }
 
-        ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
-        if (rc < 0) {
-                CERROR("ldlm_cli_cancel: %d\n", rc);
-                LBUG();
-        }
         RETURN(0);
 }
 
index 539e50b..83a544c 100644 (file)
@@ -414,11 +414,16 @@ static int mds_getlovinfo(struct ptlrpc_request *req)
 }
 
 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, __u32 data_len)
+                     void *data, __u32 data_len, int flag)
 {
         int do_ast;
         ENTRY;
 
+        if (flag == LDLM_CB_CANCELING) {
+                /* Don't need to do anything here. */
+                RETURN(0);
+        }
+
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);