Whamcloud - gitweb
b=7200
authoralex <alex>
Wed, 27 Jul 2005 18:54:34 +0000 (18:54 +0000)
committeralex <alex>
Wed, 27 Jul 2005 18:54:34 +0000 (18:54 +0000)
 - protect lock->l_resource from concurrent ldlm_lock_change_resource()

12 files changed:
lustre/cmobd/cm_oss_reint.c
lustre/include/linux/lustre_dlm.h
lustre/ldlm/l_lock.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/mdc/mdc_locks.c
lustre/mds/handler.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c

index 81b616a..3db865a 100644 (file)
@@ -141,7 +141,7 @@ static int cache_blocking_ast(struct ldlm_lock *lock,
         }
 
         /* XXX layering violation!  -phil */
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         
         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
          * such that filter_blocking_ast is called just before l_i_p takes the
@@ -149,13 +149,13 @@ static int cache_blocking_ast(struct ldlm_lock *lock,
          * correct blocking function anymore.  So check, and return early, if
          * so. */
         if (lock->l_blocking_ast != cache_blocking_ast) {
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 RETURN(0);
         }
 
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 
         if (do_ast) {
                 struct lustre_handle lockh;
index 2ad9b21..dad6885 100644 (file)
@@ -106,9 +106,12 @@ typedef enum {
 #define LDLM_FL_CLEANED        0x800000
 
 /* optimization hint: LDLM can run blocking callback from current context
- * w/o involving separate thread. in order to decrease cs rate -bzzz */
+ * w/o involving separate thread. in order to decrease cs rate */
 #define LDLM_FL_ATOMIC_CB      0x1000000
 
+/* while this flag is set, the lock can't change resource */
+#define LDLM_FL_LOCK_PROTECT   0x4000000
+#define LDLM_FL_LOCK_PROTECT_BIT  26
 
 /* The blocking callback is overloaded to perform two functions.  These flags
  * indicate which operation should be performed. */
@@ -307,6 +310,7 @@ struct ldlm_lock {
         unsigned long         l_callback_timeout;
 
         __u32                 l_pid;            /* pid which created this lock */
+        __u32                 l_pidb;           /* who holds LOCK_PROTECT_BIT */
 
         struct list_head      l_tmp;
 
@@ -681,5 +685,7 @@ static inline void check_res_locked(struct ldlm_resource *res)
         LASSERT_SPIN_LOCKED(&res->lr_lock);
 }
 
+struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock);
+void unlock_res_and_lock(struct ldlm_lock *lock);
 
 #endif
index 746b485..fb41ccb 100644 (file)
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_lib.h>
 
+/*
+ * ldlm locking uses resource to serialize access to locks
+ * but there is a case when we change resource of lock upon
+ * enqueue reply. we rely on that lock->l_resource = new_res
+ * is atomic
+ */
+struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock)
+{
+        struct ldlm_resource *res = lock->l_resource;
+
+        if (!res->lr_namespace->ns_client) {
+                /* on server-side resource of lock doesn't change */
+                lock_res(res);
+                return res;
+        } 
+
+        bit_spin_lock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags);
+        LASSERT(lock->l_pidb == 0);
+        res = lock->l_resource;
+        lock->l_pidb = current->pid;
+        lock_res(res);
+        return res;
+}
+
+void unlock_bitlock(struct ldlm_lock *lock)
+{
+        LASSERT(lock->l_pidb == current->pid);
+        lock->l_pidb = 0;
+        bit_spin_unlock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags);
+}
+
+void unlock_res_and_lock(struct ldlm_lock *lock)
+{
+        struct ldlm_resource *res = lock->l_resource;
+
+        if (!res->lr_namespace->ns_client) {
+                /* on server-side resource of lock doesn't change */
+                unlock_res(res);
+                return;
+        }
+
+        unlock_res(res);
+        unlock_bitlock(lock);
+}
+
index d73b52a..c545c89 100644 (file)
@@ -129,25 +129,24 @@ void ldlm_lock_put(struct ldlm_lock *lock)
         LASSERT(lock->l_resource != LP_POISON);
         LASSERT(atomic_read(&lock->l_refc) > 0);
         if (atomic_dec_and_test(&lock->l_refc)) {
-                struct ldlm_resource *res = lock->l_resource;
-                struct ldlm_namespace *ns = res->lr_namespace;
+                struct ldlm_resource *res;
 
                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
 
-                LASSERT(lock->l_resource != LP_POISON);
-                lock_res(res);
+                lock_res_and_lock(lock);
+                res = lock->l_resource;
                 LASSERT(lock->l_destroyed);
                 LASSERT(list_empty(&lock->l_res_link));
 
                 if (lock->l_parent)
                         LDLM_LOCK_PUT(lock->l_parent);
-                unlock_res(res);
+                unlock_res_and_lock(lock);
 
-                ldlm_resource_putref(lock->l_resource);
+                atomic_dec(&res->lr_namespace->ns_locks);
+                ldlm_resource_putref(res);
                 lock->l_resource = NULL;
                 if (lock->l_export)
                         class_export_put(lock->l_export);
-                atomic_dec(&ns->ns_locks);
 
                 if (lock->l_lvb_data != NULL)
                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
@@ -181,7 +180,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
 {
         ENTRY;
 
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
 
         if (!list_empty(&lock->l_children)) {
                 LDLM_ERROR(lock, "still has children (%p)!",
@@ -203,7 +202,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
 
         if (lock->l_destroyed) {
                 LASSERT(list_empty(&lock->l_lru));
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 EXIT;
                 return;
         }
@@ -232,7 +231,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
                 lock->l_completion_ast(lock, 0);
 #endif
 
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         LDLM_LOCK_PUT(lock);
         EXIT;
 }
@@ -275,6 +274,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         INIT_LIST_HEAD(&lock->l_cp_ast);
         init_waitqueue_head(&lock->l_waitq);
         lock->l_blocking_lock = NULL;
+        lock->l_pidb = 0;
 
         atomic_inc(&resource->lr_namespace->ns_locks);
 
@@ -291,17 +291,23 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         RETURN(lock);
 }
 
+void unlock_bitlock(struct ldlm_lock *lock);
+
 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
                               struct ldlm_res_id new_resid)
 {
         struct ldlm_resource *oldres = lock->l_resource;
+        struct ldlm_resource *newres;
+        int type;
         ENTRY;
 
-        lock_res(oldres);
+        LASSERT(ns->ns_client != 0);
+
+        lock_res_and_lock(lock);
         if (memcmp(&new_resid, &lock->l_resource->lr_name,
                    sizeof(lock->l_resource->lr_name)) == 0) {
                 /* Nothing to do */
-                unlock_res(oldres);
+                unlock_res_and_lock(lock);
                 RETURN(0);
         }
 
@@ -310,15 +316,18 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
         /* This function assumes that the lock isn't on any lists */
         LASSERT(list_empty(&lock->l_res_link));
 
-        lock->l_resource = ldlm_resource_get(ns, NULL, new_resid,
-                                             lock->l_resource->lr_type, 
-                                            1);
-        if (lock->l_resource == NULL) {
+        type = oldres->lr_type;
+        newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
+        if (newres == NULL) {
                 LBUG();
                 RETURN(-ENOMEM);
         }
 
+        lock_res(newres);
+        lock->l_resource = newres;
+        unlock_res(newres);
         unlock_res(oldres);
+        unlock_bitlock(lock);
 
         /* ...and the flowers are still standing! */
         ldlm_resource_putref(oldres);
@@ -355,19 +364,19 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
         ns = lock->l_resource->lr_namespace;
         LASSERT(ns != NULL);
 
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
 
         /* It's unlikely but possible that someone marked the lock as
          * destroyed after we did handle2object on it */
         if (lock->l_destroyed) {
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
                 LDLM_LOCK_PUT(lock);
                 GOTO(out, retval);
         }
 
         if (flags && (lock->l_flags & flags)) {
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 LDLM_LOCK_PUT(lock);
                 GOTO(out, retval);
         }
@@ -375,7 +384,7 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
         if (flags)
                 lock->l_flags |= flags;
 
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         retval = lock;
         EXIT;
  out:
@@ -465,9 +474,9 @@ void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
 /* only called for local locks */
 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
 {
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         ldlm_lock_addref_internal_nolock(lock, mode);
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 }
 
 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
@@ -475,9 +484,9 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
         struct ldlm_namespace *ns;
         ENTRY;
 
-        ns = lock->l_resource->lr_namespace;
+        lock_res_and_lock(lock);
 
-        lock_res(lock->l_resource);
+        ns = lock->l_resource->lr_namespace;
 
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
@@ -509,7 +518,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
 
                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
                 ldlm_lock_remove_from_lru(lock);
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
                                 ldlm_bl_to_thread(ns, NULL, lock) != 0)
                         ldlm_handle_bl_callback(ns, NULL, lock);
@@ -523,10 +532,10 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
                 ns->ns_nr_unused++;
                 spin_unlock(&ns->ns_unused_lock);
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 ldlm_cancel_lru(ns, LDLM_ASYNC);
         } else {
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
         }
 
         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
@@ -552,9 +561,9 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
         LASSERT(lock != NULL);
 
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         lock->l_flags |= LDLM_FL_CBPENDING;
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         ldlm_lock_decref_internal(lock, mode);
         LDLM_LOCK_PUT(lock);
 }
@@ -654,10 +663,10 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
 
 void ldlm_lock_allow_match(struct ldlm_lock *lock)
 {
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         lock->l_flags |= LDLM_FL_CAN_MATCH;
         wake_up(&lock->l_waitq);
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 }
 
 /* Can be called in two ways:
@@ -859,7 +868,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
                 LASSERT(rc == ELDLM_OK);
         }
 
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         if (local && lock->l_req_mode == lock->l_granted_mode) {
                 /* The server returned a blocked lock, but it was granted before
                  * we got a chance to actually enqueue it.  We don't need to do
@@ -911,7 +920,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
         policy(lock, flags, 1, &rc, NULL);
         EXIT;
 out:
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         return rc;
 }
 
@@ -958,14 +967,14 @@ int ldlm_run_bl_ast_work(struct list_head *rpc_list)
                         list_entry(tmp, struct ldlm_lock, l_bl_ast);
 
                 /* nobody should touch l_bl_ast */
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 list_del_init(&lock->l_bl_ast);
 
                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
                 LASSERT(lock->l_bl_ast_run == 0);
                 LASSERT(lock->l_blocking_lock);
                 lock->l_bl_ast_run++;
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
 
                 ldlm_lock2desc(lock->l_blocking_lock, &d);
 
@@ -1005,11 +1014,11 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list)
                         list_entry(tmp, struct ldlm_lock, l_cp_ast);
 
                 /* nobody should touch l_cp_ast */
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 list_del_init(&lock->l_cp_ast);
                 LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
                 lock->l_flags &= ~LDLM_FL_CP_REQD;
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
 
                 if (lock->l_completion_ast != NULL)
                         rc = lock->l_completion_ast(lock, 0, 0);
@@ -1094,10 +1103,10 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
                 lock->l_flags |= LDLM_FL_CANCEL;
                 if (lock->l_blocking_ast) {
-                        unlock_res(lock->l_resource);
+                        unlock_res_and_lock(lock);
                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
                                              LDLM_CB_CANCELING);
-                        lock_res(lock->l_resource);
+                        lock_res_and_lock(lock);
                 } else {
                         LDLM_DEBUG(lock, "no blocking ast");
                 }
@@ -1110,12 +1119,12 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         struct ldlm_namespace *ns;
         ENTRY;
 
+        ldlm_del_waiting_lock(lock);
+        lock_res_and_lock(lock);
+        
         res = lock->l_resource;
         ns = res->lr_namespace;
 
-        ldlm_del_waiting_lock(lock);
-        lock_res(res);
-        
         /* Please do not, no matter how tempting, remove this LBUG without
          * talking to me first. -phik */
         if (lock->l_readers || lock->l_writers) {
@@ -1126,7 +1135,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         ldlm_cancel_callback(lock);
 
         ldlm_resource_unlink_lock(lock);
-        unlock_res(res);
+        unlock_res_and_lock(lock);
         
         ldlm_lock_destroy(lock);
 
@@ -1189,11 +1198,11 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
         LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR,
                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
 
+        lock_res_and_lock(lock);
+
         res = lock->l_resource;
         ns = res->lr_namespace;
 
-        lock_res(res);
-
         old_mode = lock->l_req_mode;
         lock->l_req_mode = new_mode;
         ldlm_resource_unlink_lock(lock);
@@ -1229,7 +1238,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                         granted = 1;
                 }
         }
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 
         if (granted)
                 ldlm_run_cp_ast_work(&rpc_list);
index a7275d0..f4fae3c 100644 (file)
@@ -416,11 +416,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         if (lock->l_granted_mode != lock->l_req_mode) {
                 /* this blocking AST will be communicated as part of the
                  * completion AST instead */
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
                 ptlrpc_req_finished(req);
                 RETURN(0);
@@ -428,7 +428,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 
         if (lock->l_destroyed) {
                 /* What's the point? */
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 ptlrpc_req_finished(req);
                 RETURN(0);
         }
@@ -444,7 +444,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 
         if (lock->l_granted_mode == lock->l_req_mode)
                 ldlm_add_waiting_lock(lock);
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 
         req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
@@ -481,12 +481,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (total_enqueue_wait / 1000000 > obd_timeout)
                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
 
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         if (lock->l_resource->lr_lvb_len) {
                 buffers = 2;
                 size[1] = lock->l_resource->lr_lvb_len;
         }
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK,
@@ -505,10 +505,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 
                 lvb = lustre_msg_buf(req->rq_reqmsg, 1,
                                      lock->l_resource->lr_lvb_len);
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                        lock->l_resource->lr_lvb_len);
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
@@ -519,12 +519,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
 
         /* We only send real blocking ASTs after the lock is granted */
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         if (lock->l_flags & LDLM_FL_AST_SENT) {
                 body->lock_flags |= LDLM_FL_AST_SENT;
                 ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
         }
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 
         rc = ptlrpc_queue_wait(req);
         if (rc != 0)
@@ -537,7 +537,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
-        struct ldlm_resource *res = lock->l_resource;
+        struct ldlm_resource *res;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
         int rc = 0, size = sizeof(*body);
@@ -556,9 +556,10 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
                sizeof(body->lock_handle1));
         ldlm_lock2desc(lock, &body->lock_desc);
 
-       lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         size = lock->l_resource->lr_lvb_len;
-       unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
+        res = lock->l_resource;
         req->rq_replen = lustre_msg_size(1, &size);
 
         req->rq_send_state = LUSTRE_IMP_FULL;
@@ -671,12 +672,12 @@ existing_lock:
                 cookie = req;
         } else {
                 int buffers = 1;
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 if (lock->l_resource->lr_lvb_len) {
                         size[1] = lock->l_resource->lr_lvb_len;
                         buffers = 2;
                 }
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                         GOTO(out, rc = -ENOMEM);
 
@@ -705,13 +706,13 @@ existing_lock:
 
         /* We never send a blocking AST until the lock is granted, but
          * we can tell it right now */
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         if (lock->l_flags & LDLM_FL_AST_SENT) {
                 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
                 if (lock->l_granted_mode == lock->l_req_mode)
                         ldlm_add_waiting_lock(lock);
         }
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 
         EXIT;
  out:
@@ -730,7 +731,7 @@ existing_lock:
                            "(err=%d, rc=%d)", err, rc);
 
                 if (rc == 0) {
-                        lock_res(lock->l_resource);
+                        lock_res_and_lock(lock);
                         size[1] = lock->l_resource->lr_lvb_len;
                         if (size[1] > 0) {
                                 void *lvb = lustre_msg_buf(req->rq_repmsg,
@@ -741,7 +742,7 @@ existing_lock:
                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                                        size[1]);
                         }
-                        unlock_res(lock->l_resource);
+                        unlock_res_and_lock(lock);
                 } else {
                         ldlm_lock_destroy(lock);
                 }
@@ -878,10 +879,10 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
 
         LDLM_DEBUG(lock, "client blocking AST callback handler START");
         
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 
         if (do_ast) {
                 LDLM_DEBUG(lock, "already unused, calling "
@@ -904,13 +905,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                                     struct ldlm_request *dlm_req,
                                     struct ldlm_lock *lock)
 {
-        struct ldlm_resource *res = lock->l_resource;
         LIST_HEAD(ast_list);
         ENTRY;
 
         LDLM_DEBUG(lock, "client completion callback handler START");
 
-        lock_res(res);
+        lock_res_and_lock(lock);
 
         /* If we receive the completion AST before the actual enqueue returned,
          * then we might need to switch lock modes, resources, or extents. */
@@ -929,11 +929,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
                    &lock->l_resource->lr_name,
                    sizeof(lock->l_resource->lr_name)) != 0) {
-                unlock_res(res);
+                unlock_res_and_lock(lock);
                 ldlm_lock_change_resource(ns, lock,
                                          dlm_req->lock_desc.l_resource.lr_name);
                 LDLM_DEBUG(lock, "completion AST, new resource");
-                lock_res(res);
+                CERROR("change resource!\n");
+                lock_res_and_lock(lock);
         }
 
         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
@@ -954,7 +955,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         }
 
         ldlm_grant_lock(lock, &ast_list);
-        unlock_res(res);
+        unlock_res_and_lock(lock);
 
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
         LDLM_LOCK_PUT(lock);
@@ -986,18 +987,18 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
                 ptlrpc_error(req);
         }
 
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         if (lock->l_granted_mode == LCK_PW &&
             !lock->l_readers && !lock->l_writers &&
             time_after(jiffies, lock->l_last_used + 10 * HZ)) {
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 if (ldlm_bl_to_thread(ns, NULL, lock))
                         ldlm_handle_bl_callback(ns, NULL, lock);
 
                 EXIT;
                 return;
         }
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         LDLM_LOCK_PUT(lock);
         EXIT;
 }
@@ -1186,7 +1187,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         }
 
         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
+        lock_res_and_lock(lock);
         lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
+        unlock_res_and_lock(lock);
 
         /* We want the ost thread to get this reply so that it can respond
          * to ost requests (write cache writeback) that might be triggered
@@ -1654,3 +1657,9 @@ EXPORT_SYMBOL(target_send_reply);
 EXPORT_SYMBOL(target_queue_recovery_request);
 EXPORT_SYMBOL(target_handle_ping);
 EXPORT_SYMBOL(target_handle_disconnect);
+
+/* l_lock.c */
+EXPORT_SYMBOL(lock_res_and_lock);
+EXPORT_SYMBOL(unlock_res_and_lock);
+
+
index 2a9d8a8..b57f1f7 100644 (file)
@@ -214,9 +214,9 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
                                 struct lustre_handle *lockh, int mode)
 {
         /* Set a flag to prevent us from sending a CANCEL (bug 407) */
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
 
         ldlm_lock_decref_and_cancel(lockh, mode);
@@ -402,9 +402,9 @@ int ldlm_cli_enqueue(struct obd_export *exp,
         }
 
         if ((*flags) & LDLM_FL_AST_SENT) {
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 lock->l_flags |= LDLM_FL_CBPENDING;
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
         }
 
@@ -573,11 +573,11 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
 
                 LDLM_DEBUG(lock, "client-side cancel");
                 /* Set this flag to prevent others from getting new references*/
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 lock->l_flags |= LDLM_FL_CBPENDING;
                 local_only = lock->l_flags & LDLM_FL_LOCAL_ONLY;
                 ldlm_cancel_callback(lock);
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
 
                 if (local_only) {
                         CDEBUG(D_INFO, "not sending request (at caller's "
@@ -676,7 +676,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
                 spin_unlock(&ns->ns_unused_lock);
 
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 ldlm_lock_remove_from_lru(lock);
 
                 /* Setting the CBPENDING flag is a little misleading, but
@@ -694,7 +694,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
                 if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock))                        
                         list_add(&lock->l_tmp, &cblist);
 
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
 
                 spin_lock(&ns->ns_unused_lock);
 
index 54233fc..1f9673c 100644 (file)
@@ -806,7 +806,7 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
 
                 down(&lli->lli_size_sem);
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 kms = ldlm_extent_shift_kms(lock,
                                             lsm->lsm_oinfo[stripe].loi_kms);
                
@@ -814,7 +814,7 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
                 lsm->lsm_oinfo[stripe].loi_kms = kms;
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 up(&lli->lli_size_sem);
                 //ll_try_done_writing(inode);
         iput:
@@ -862,14 +862,14 @@ int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
 
                 down(&inode->i_sem);
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
                 kms = ldlm_extent_shift_kms(NULL, kms);
                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
                 lsm->lsm_oinfo[stripe].loi_kms = kms;
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 up(&inode->i_sem);
         }
 
index ac174a5..380c4db 100644 (file)
@@ -1013,7 +1013,7 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
         struct inode *inode = NULL;
 
         /* NOTE: we depend on atomic igrab() -bzzz */
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         if (lock->l_ast_data) {
                 struct ll_inode_info *lli = ll_i2info(lock->l_ast_data);
                 if (lli->lli_inode_magic == LLI_INODE_MAGIC) {
@@ -1026,7 +1026,7 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
                         inode = NULL;
                 }
         }
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         return inode;
 }
 
index 96b87f0..7b08a84 100644 (file)
@@ -138,7 +138,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *l, void *data)
         lock = ldlm_handle2lock(lockh);
 
         LASSERT(lock != NULL);
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
 #ifdef __KERNEL__
         if (lock->l_ast_data && lock->l_ast_data != data) {
                 struct inode *new_inode = data;
@@ -152,7 +152,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *l, void *data)
         }
 #endif
         lock->l_ast_data = data;
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         LDLM_LOCK_PUT(lock);
 
         EXIT;
index ef135b5..63ba972 100644 (file)
@@ -805,7 +805,7 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         }
 
         /* XXX layering violation!  -phil */
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         
         /*
          * get this: if mds_blocking_ast is racing with mds_intent_policy, such
@@ -814,13 +814,13 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
          * blocking function anymore.  So check, and return early, if so.
          */
         if (lock->l_blocking_ast != mds_blocking_ast) {
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 RETURN(0);
         }
 
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 
         if (do_ast) {
                 struct lustre_handle lockh;
@@ -2698,26 +2698,26 @@ static void mds_revoke_export_locks(struct obd_export *exp)
         spin_lock(&exp->exp_ldlm_data.led_lock);
         list_for_each_entry_safe(lock, next, locklist, l_export_chain) {
 
-                lock_res(lock->l_resource);
+                lock_res_and_lock(lock);
                 if (lock->l_req_mode != lock->l_granted_mode) {
-                        unlock_res(lock->l_resource);
+                        unlock_res_and_lock(lock);
                         continue;
                 }
 
                 LASSERT(lock->l_resource);
                 if (lock->l_resource->lr_type != LDLM_IBITS &&
                     lock->l_resource->lr_type != LDLM_PLAIN) {
-                        unlock_res(lock->l_resource);
+                        unlock_res_and_lock(lock);
                         continue;
                 }
 
                 if (lock->l_flags & LDLM_FL_AST_SENT) {
-                        unlock_res(lock->l_resource);
+                        unlock_res_and_lock(lock);
                         continue;
                 }
 
                 lock->l_flags |= LDLM_FL_AST_SENT;
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
 
                 /* the desc just pretend to exclusive */
                 ldlm_lock2desc(lock, &desc);
@@ -4140,7 +4140,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         }
 
         /* Fixup the lock to be given to the client */
-        lock_res(new_lock->l_resource);
+        lock_res_and_lock(new_lock);
         new_lock->l_readers = 0;
         new_lock->l_writers = 0;
 
@@ -4159,7 +4159,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
 
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
-        unlock_res(new_lock->l_resource);
+        unlock_res_and_lock(new_lock);
         LDLM_LOCK_PUT(new_lock);
 
         RETURN(ELDLM_LOCK_REPLACED);
index f236898..1652e69 100644 (file)
@@ -1068,20 +1068,20 @@ static int filter_blocking_ast(struct ldlm_lock *lock,
         }
 
         /* XXX layering violation!  -phil */
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
         /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
          * such that filter_blocking_ast is called just before l_i_p takes the
          * ns_lock, then by the time we get the lock, we might not be the
          * correct blocking function anymore.  So check, and return early, if
          * so. */
         if (lock->l_blocking_ast != filter_blocking_ast) {
-                unlock_res(lock->l_resource);
+                unlock_res_and_lock(lock);
                 RETURN(0);
         }
 
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
 
         if (do_ast) {
                 struct lustre_handle lockh;
@@ -1308,7 +1308,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
         lock->l_req_mode = LCK_PR;
 
-        lock_res(res);
+        lock_res_and_lock(lock);
+        res = lock->l_resource;
         rc = policy(lock, &tmpflags, 0, &err, &rpc_list);
 
         /* FIXME: we should change the policy function slightly, to not make
@@ -1325,7 +1326,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
 
         if (rc == LDLM_ITER_CONTINUE) {
                 /* The lock met with no resistance; we're finished. */
-                unlock_res(res);
+                unlock_res_and_lock(lock);
                 RETURN(ELDLM_LOCK_REPLACED);
         }
 
index a0d4323..9dc9d44 100644 (file)
@@ -2377,7 +2377,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
                 return;
         }
 
-        lock_res(lock->l_resource);
+        lock_res_and_lock(lock);
 #ifdef __KERNEL__
         if (lock->l_ast_data && lock->l_ast_data != data) {
                 struct inode *new_inode = data;
@@ -2393,7 +2393,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
         }
 #endif
         lock->l_ast_data = data;
-        unlock_res(lock->l_resource);
+        unlock_res_and_lock(lock);
         LDLM_LOCK_PUT(lock);
 }