Whamcloud - gitweb
- Cleanup of the lock callback infrastructure - not quite functional
authorbraam <braam>
Tue, 25 Jun 2002 04:49:57 +0000 (04:49 +0000)
committerbraam <braam>
Tue, 25 Jun 2002 04:49:57 +0000 (04:49 +0000)
  yet.

- API improvements.
- Documentation update.

12 files changed:
lustre/Makefile.am
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_test.c
lustre/llite/dcache.c
lustre/llite/namei.c
lustre/mds/handler.c
lustre/mds/mds_reint.c
lustre/osc/osc_request.c

index e1e25cb..0a30460 100644 (file)
@@ -6,7 +6,7 @@
 AUTOMAKE_OPTIONS = foreign
 
 # NOTE: keep extN before mds
-SUBDIRS = ptlrpc llite lib ldlm obdecho mdc osc extN mds ost 
+SUBDIRS = ldlm ptlrpc llite lib obdecho mdc osc extN mds ost 
 SUBDIRS+= utils tests obdfilter obdclass obdfs demos doc scripts 
 EXTRA_DIST = BUGS FDL Rules include patches
 
index 67bd6eb..3451a66 100644 (file)
@@ -102,7 +102,7 @@ struct ldlm_lock;
 
 typedef int (*ldlm_lock_callback)(struct lustre_handle *lockh,
                                   struct ldlm_lock_desc *new, void *data,
-                                  __u32 data_len, struct ptlrpc_request **req);
+                                  __u32 data_len);
 
 struct ldlm_lock {
         __u64                  l_random;
@@ -174,6 +174,15 @@ struct ldlm_resource {
         void                  *lr_tmp;
 };
 
+struct ldlm_ast_work { 
+        struct ldlm_lock *w_lock;
+        int               w_blocking;
+        struct ldlm_lock_desc w_desc;
+        struct list_head   w_list;
+        void *w_data;
+        int w_datalen;
+};
+        
 static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
 {
         return (struct ldlm_extent *)(res->lr_name);
@@ -213,8 +222,9 @@ void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
 void ldlm_lock_put(struct ldlm_lock *lock);
 void ldlm_lock_destroy(struct ldlm_lock *lock);
 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
-void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode);
-void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode);
+void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
+void ldlm_lock_addref_internal(struct ldlm_lock* , __u32 mode);
+void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_grant_lock(struct ldlm_lock *lock);
 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
                     void *cookie, int cookielen, ldlm_mode_t mode,
@@ -269,8 +279,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl,
                      void *data,
                      __u32 data_len,
                      struct lustre_handle *lockh);
-int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
-                      void *data, __u32 data_len, struct ptlrpc_request **reqp);
+int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
+                    void *data, __u32 data_len);
 int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *,
                      int new_mode, int *flags);
 int ldlm_cli_cancel(struct lustre_handle *);
index 02f5e17..e3797b7 100644 (file)
@@ -101,6 +101,7 @@ struct ptlrpc_client {
         struct obd_device *cli_obd;
         __u32 cli_request_portal;
         __u32 cli_reply_portal;
+
         __u64 cli_last_rcvd;
         __u64 cli_last_committed;
         __u32 cli_target_devno;
index 8087e6a..d683992 100644 (file)
@@ -403,37 +403,41 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
 }
 
-static int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
+static void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new)
 {
-        struct lustre_handle lockh;
-        struct ldlm_lock_desc desc;
-        struct ptlrpc_request *req = NULL;
+        struct ldlm_ast_work *w;
         ENTRY;
-
-
+        
+        OBD_ALLOC(w, sizeof(*w));
+        if (!w) { 
+                LBUG();
+                return;
+        }
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        if (lock->l_flags & LDLM_FL_AST_SENT) {
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                RETURN(0);
+        ldlm_lock_get(lock);
+        if (new) { 
+                w->w_blocking = 1;
+                ldlm_lock2desc(new, &w->w_desc);
         }
-
+        w->w_lock = lock;
         lock->l_flags |= LDLM_FL_AST_SENT;
-        /* FIXME: this should merely add the lock to the lr_tmp list */
-        ldlm_lock2handle(lock, &lockh);
-        ldlm_lock2desc(new, &desc);
-        lock->l_blocking_ast(&lockh, &desc, lock->l_data, lock->l_data_len,
-                             &req);
+        list_add(&w->w_list, lock->l_resource->lr_tmp);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        return;
+}
 
-        if (req != NULL) {
-                struct list_head *list = lock->l_resource->lr_tmp;
-                list_add(&req->rq_multi, list);
-        }
-        RETURN(1);
+void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
+{
+                struct ldlm_lock *lock;
+
+                lock = ldlm_handle2lock(lockh);
+                ldlm_lock_addref_internal(lock, mode);
+                ldlm_lock_put(lock);
 }
 
-/* Args: unlocked lock */
-void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
+/* only called for local locks */
+void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
 {
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
@@ -445,13 +449,15 @@ void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
 }
 
 /* Args: unlocked lock */
-void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
+void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
 {
+        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
         ENTRY;
 
         if (lock == NULL)
                 LBUG();
 
+        LDLM_DEBUG(lock, "ldlm_lock_decref(%d)", mode);
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
                 lock->l_readers--;
@@ -475,7 +481,7 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
 
                 ldlm_lock2handle(lock, &lockh);
                 lock->l_blocking_ast(&lockh, NULL, lock->l_data,
-                                     lock->l_data_len, NULL);
+                                     lock->l_data_len);
         } else
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
@@ -510,16 +516,10 @@ static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
 
                 rc = 0;
 
-                CDEBUG(D_OTHER, "compat function failed and lock modes "
-                       "incompat\n");
                 if (send_cbs && child->l_blocking_ast != NULL) {
                         CDEBUG(D_OTHER, "incompatible; sending blocking "
                                "AST.\n");
-                        /* It's very difficult to actually send the AST from
-                         * here, because we'd have to drop the lock before going
-                         * to sleep to wait for the reply.  Instead we build the
-                         * packet and send it later. */
-                        ldlm_send_blocking_ast(child, lock);
+                        ldlm_add_ast_work_item(child, lock);
                 }
         }
 
@@ -548,7 +548,6 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
 void ldlm_grant_lock(struct ldlm_lock *lock)
 {
         struct ldlm_resource *res = lock->l_resource;
-        struct ptlrpc_request *req = NULL;
         ENTRY;
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
@@ -559,20 +558,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock)
                 res->lr_most_restr = lock->l_granted_mode;
 
         if (lock->l_completion_ast) {
-                struct lustre_handle lockh;
-
-                /* FIXME: this should merely add lock to lr_tmp list */
-                ldlm_lock2handle(lock, &lockh);
-                lock->l_completion_ast(&lockh, NULL, lock->l_data,
-                                       lock->l_data_len, &req);
-                if (req != NULL) {
-                        struct list_head *list = res->lr_tmp;
-                        if (list == NULL) {
-                                LBUG();
-                                return;
-                        }
-                        list_add(&req->rq_multi, list);
-                }
+                ldlm_add_ast_work_item(lock, NULL);
         }
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
         EXIT;
@@ -600,7 +586,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                      lock->l_extent.end < extent->end))
                         continue;
 
-                ldlm_lock_addref(lock, mode);
+                ldlm_lock_addref_internal(lock, mode);
                 return lock;
         }
 
@@ -642,7 +628,10 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
         if (lock)
                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
                                          lock->l_granted_mode);
-
+        if (rc) 
+                LDLM_DEBUG(lock, "matched");
+        else
+                LDLM_DEBUG(lock, "not matched");
         return rc;
 }
 
@@ -781,31 +770,34 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res,
 
                 list_del_init(&pending->l_res_link);
                 ldlm_grant_lock(pending);
-
-                ldlm_lock_addref(pending, pending->l_req_mode);
-                ldlm_lock_decref(pending, pending->l_granted_mode);
         }
 
         RETURN(0);
 }
 
-static void ldlm_send_delayed_asts(struct list_head *rpc_list)
+static void ldlm_run_ast_work(struct list_head *rpc_list)
 {
         struct list_head *tmp, *pos;
+        int rc;
         ENTRY;
 
         list_for_each_safe(tmp, pos, rpc_list) {
-                int rc;
-                struct ptlrpc_request *req =
-                        list_entry(tmp, struct ptlrpc_request, rq_multi);
-
-                CDEBUG(D_INFO, "Sending callback.\n");
-
-                rc = ptlrpc_queue_wait(req);
-                rc = ptlrpc_check_status(req, rc);
-                ptlrpc_free_req(req);
-                if (rc)
-                        CERROR("Callback send failed: %d\n", rc);
+                struct ldlm_ast_work *w =   
+                        list_entry(tmp, struct ldlm_ast_work, w_list);
+                struct lustre_handle lockh;
+                
+                ldlm_lock2handle(w->w_lock, &lockh); 
+                if (w->w_blocking) { 
+                        rc = w->w_lock->l_blocking_ast(&lockh, &w->w_desc, w->w_data, w->w_datalen); 
+                } else { 
+                        rc = w->w_lock->l_completion_ast(&lockh, NULL, w->w_data, w->w_datalen); 
+                }
+                if (rc) { 
+                        CERROR("Failed AST - should clean & disconnect client\n");
+                }
+                ldlm_lock_put(w->w_lock);
+                list_del(&w->w_list);
+                OBD_FREE(w, sizeof(*w));
         }
         EXIT;
 }
@@ -832,7 +824,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
         res->lr_tmp = NULL;
         l_unlock(&res->lr_namespace->ns_lock);
 
-        ldlm_send_delayed_asts(&rpc_list);
+        ldlm_run_ast_work(&rpc_list);
         EXIT;
 }
 
index 3b4370b..00a2039 100644 (file)
@@ -68,7 +68,7 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service
 
         flags = dlm_req->lock_flags;
         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
-                                ldlm_cli_callback, ldlm_cli_callback);
+                                ldlm_server_ast, ldlm_server_ast);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -226,7 +226,7 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc,
                                 ldlm_lock2handle(lock, &lockh);
                                 lock->l_blocking_ast(&lockh, descp,
                                                      lock->l_data,
-                                                     lock->l_data_len, NULL);
+                                                     lock->l_data_len);
                         }
                 } else {
                         LDLM_DEBUG(lock, "Lock still has references, will be"
index c7d44e3..099bd76 100644 (file)
@@ -39,7 +39,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         if (lock == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* for the local lock, add the reference */
-        ldlm_lock_addref(lock, mode);
+        ldlm_lock_addref_internal(lock, mode);
         ldlm_lock2handle(lock, lockh);
 
         LDLM_DEBUG(lock, "client-side enqueue START");
@@ -140,21 +140,20 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         return rc;
 }
 
-int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
-                      void *data, __u32 data_len, struct ptlrpc_request **reqp)
+int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
+                      void *data, __u32 data_len)
 {
         struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
-        struct ptlrpc_client *cl =
-                &lock->l_resource->lr_namespace->ns_rpc_client;
+        struct ptlrpc_client *cl;
         int rc = 0, size = sizeof(*body);
         ENTRY;
 
         lock = ldlm_handle2lock(lockh);
         if (lock == NULL)
                 LBUG();
-
+        cl = &lock->l_resource->lr_namespace->ns_rpc_client;
         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
                               &size, NULL);
         if (!req)
@@ -177,14 +176,9 @@ int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
 
         req->rq_replen = lustre_msg_size(0, NULL);
 
-        if (reqp == NULL) {
-                LBUG();
-                rc = ptlrpc_queue_wait(req);
-                rc = ptlrpc_check_status(req, rc);
-                ptlrpc_free_req(req);
-        } else
-                *reqp = req;
-
+        rc = ptlrpc_queue_wait(req);
+        rc = ptlrpc_check_status(req, rc);
+        ptlrpc_free_req(req);
 
         EXIT;
  out:
@@ -281,7 +275,6 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 GOTO(out, rc);
 
         ldlm_lock_cancel(lock);
-        ldlm_reprocess_all(lock->l_resource);
         ldlm_lock_put(lock); 
         EXIT;
  out:
index 02ffca2..efe314e 100644 (file)
@@ -26,8 +26,7 @@ static int regression_running = 0;
 
 static int ldlm_test_callback(struct lustre_handle *lockh,
                               struct ldlm_lock_desc *new,
-                              void *data, __u32 data_len,
-                              struct ptlrpc_request **reqp)
+                              void *data, __u32 data_len)
 {
         printk("ldlm_test_callback: lock=%Lu, new=%p\n", lockh->addr, new);
         return 0;
index 3a6d230..5d1c29d 100644 (file)
@@ -21,7 +21,6 @@ extern struct address_space_operations ll_aops;
 
 void ll_intent_release(struct dentry *de)
 {
-        struct ldlm_lock *lock;
         struct lustre_handle *handle;
         ENTRY;
 
@@ -32,10 +31,7 @@ void ll_intent_release(struct dentry *de)
 
         if (de->d_it->it_lock_mode) {
                 handle = (struct lustre_handle *)de->d_it->it_lock_handle;
-                lock = lustre_handle2object(handle);
-                LDLM_DEBUG(lock, "calling ldlm_lock_decref(%d)",
-                           de->d_it->it_lock_mode);
-                ldlm_lock_decref(lock, de->d_it->it_lock_mode);
+                ldlm_lock_decref(handle, de->d_it->it_lock_mode);
         }
         de->d_it = NULL;
         EXIT;
index e367bbc..ac813a4 100644 (file)
@@ -120,11 +120,9 @@ int ll_lock(struct inode *dir, struct dentry *dentry,
 
 int ll_unlock(__u32 mode, struct lustre_handle *lockh)
 {
-        struct ldlm_lock *lock;
         ENTRY;
 
-        lock = lustre_handle2object(lockh);
-        ldlm_lock_decref(lock, mode);
+        ldlm_lock_decref(lockh, mode);
 
         RETURN(0);
 }
index cfe6f6c..b1ae5c4 100644 (file)
@@ -279,7 +279,6 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
         struct mds_body *body;
         struct dentry *de = NULL, *dchild = NULL;
         struct inode *dir;
-        struct ldlm_lock *lock;
         struct lustre_handle lockh;
         char *name;
         int namelen, flags, lock_mode, rc = 0;
@@ -327,9 +326,6 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
                         CERROR("lock enqueue: err: %d\n", rc);
                         GOTO(out_create_de, rc = -EIO);
                 }
-        } else {
-                lock = lustre_handle2object(&lockh);
-                LDLM_DEBUG(lock, "matched");
         }
         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
 
@@ -366,8 +362,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
 out_create_dchild:
         l_dput(dchild);
         up(&dir->i_sem);
-        lock = lustre_handle2object(&lockh);
-        ldlm_lock_decref(lock, lock_mode);
+        ldlm_lock_decref(&lockh, lock_mode);
 out_create_de:
         l_dput(de);
 out_pre_de:
index 0b536c6..449d594 100644 (file)
@@ -190,7 +190,6 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         struct dentry *dchild = NULL;
         struct inode *dir;
         void *handle;
-        struct ldlm_lock *lock;
         struct lustre_handle lockh;
         int rc = 0, err, flags, lock_mode, type = rec->ur_mode & S_IFMT;
         __u64 res_id[3] = {0,0,0};
@@ -228,9 +227,6 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         CERROR("lock enqueue: err: %d\n", rc);
                         GOTO(out_create_de, rc = -EIO);
                 }
-        } else {
-                lock = ldlm_handle2lock(&lockh);
-                LDLM_DEBUG(lock, "matched");
         }
         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
 
@@ -370,8 +366,7 @@ out_create_commit:
 out_create_dchild:
         l_dput(dchild);
         up(&dir->i_sem);
-        lock = lustre_handle2object(&lockh);
-        ldlm_lock_decref(lock, lock_mode);
+        ldlm_lock_decref(&lockh, lock_mode);
 out_create_de:
         l_dput(de);
         req->rq_status = rc;
@@ -408,7 +403,6 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         int lock_mode, flags;
         __u64 res_id[3] = {0};
         struct lustre_handle lockh;
-        struct ldlm_lock *lock;
         void *handle;
         int rc = 0;
         int err;
@@ -438,10 +432,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         CERROR("lock enqueue: err: %d\n", rc);
                         GOTO(out_unlink_de, rc = -EIO);
                 }
-        } else {
-                lock = lustre_handle2object(&lockh);
-                LDLM_DEBUG(lock, "matched");
-        }
+        } else
+
         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
 
         down(&dir->i_sem);
@@ -515,8 +507,8 @@ out_unlink_dchild:
         l_dput(dchild);
 out_unlink_de:
         up(&dir->i_sem);
-        lock = lustre_handle2object(&lockh);
-        ldlm_lock_decref(lock, lock_mode);
+        ldlm_lock_decref(&lockh
+, lock_mode);
         if (!rc) { 
                 /* Take an exclusive lock on the resource that we're
                  * about to free, to force everyone to drop their
@@ -537,8 +529,7 @@ out_unlink_de:
         l_dput(de);
 
         if (!rc) { 
-                lock = lustre_handle2object(&lockh);
-                ldlm_lock_decref(lock, LCK_EX);
+                ldlm_lock_decref(&lockh, LCK_EX);
                 rc = ldlm_cli_cancel(&lockh);
                 if (rc < 0)
                         CERROR("failed to cancel child inode lock ino "
index a1db49a..da29a96 100644 (file)
@@ -683,12 +683,11 @@ static int osc_enqueue(struct obd_conn *oconn,
                              sizeof(extent), mode2, lockh);
         if (rc == 1) {
                 int flags;
-                struct ldlm_lock *lock = lustre_handle2object(lockh);
                 /* FIXME: This is not incredibly elegant, but it might
                  * be more elegant than adding another parameter to
                  * lock_match.  I want a second opinion. */
-                ldlm_lock_addref(lock, mode);
-                ldlm_lock_decref(lock, mode2);
+                ldlm_lock_addref(lockh, mode);
+                ldlm_lock_decref(lockh, mode2);
 
                 if (mode == LCK_PR)
                         return 0;
@@ -709,11 +708,9 @@ static int osc_enqueue(struct obd_conn *oconn,
 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
                       struct lustre_handle *lockh)
 {
-        struct ldlm_lock *lock;
         ENTRY;
 
-        lock = lustre_handle2object(lockh);
-        ldlm_lock_decref(lock, mode);
+        ldlm_lock_decref(lockh, mode);
 
         RETURN(0);
 }