Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index a24ce4f..a9bd553 100644 (file)
@@ -54,6 +54,9 @@ int ldlm_expired_completion_wait(void *data)
         if (lock->l_conn_export == NULL) {
                 static cfs_time_t next_dump = 0, last_dump = 0;
 
+                if (ptlrpc_check_suspend())
+                        RETURN(0);
+
                 LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago); "
                            "not entering recovery in server code, just going "
                            "back to sleep", lock->l_enqueued_time.tv_sec,
@@ -223,7 +226,8 @@ int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
         return -ELDLM_NO_LOCK_DATA;
 }
 
-int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id res_id,
+int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+                           const struct ldlm_res_id *res_id,
                            ldlm_type_t type, ldlm_policy_data_t *policy,
                            ldlm_mode_t mode, int *flags,
                            ldlm_blocking_callback blocking,
@@ -237,14 +241,14 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id res_id,
         ENTRY;
 
         LASSERT(!(*flags & LDLM_FL_REPLAY));
-        if (ns->ns_client) {
+        if (unlikely(ns->ns_client)) {
                 CERROR("Trying to enqueue local lock in a shadow namespace\n");
                 LBUG();
         }
 
         lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
                                 completion, glimpse, data, lvb_len);
-        if (!lock)
+        if (unlikely(!lock))
                 GOTO(out_nolock, err = -ENOMEM);
         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
 
@@ -262,13 +266,11 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id res_id,
                 lock->l_req_extent = policy->l_extent;
 
         err = ldlm_lock_enqueue(ns, &lock, policy, flags);
-        if (err != ELDLM_OK)
+        if (unlikely(err != ELDLM_OK))
                 GOTO(out, err);
 
         if (policy != NULL)
                 *policy = lock->l_policy_data;
-        if ((*flags) & LDLM_FL_LOCK_CHANGED)
-                res_id = lock->l_resource->lr_name;
 
         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
                           lock);
@@ -380,15 +382,21 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                         lock->l_req_mode = newmode;
                 }
 
-                if (reply->lock_desc.l_resource.lr_name.name[0] !=
-                    lock->l_resource->lr_name.name[0]) {
-                        CDEBUG(D_INFO, "remote intent success, locking %ld "
-                               "instead of %ld\n",
+                if (memcmp(reply->lock_desc.l_resource.lr_name.name,
+                          lock->l_resource->lr_name.name,
+                          sizeof(struct ldlm_res_id))) {
+                        CDEBUG(D_INFO, "remote intent success, locking "
+                                        "(%ld,%ld,%ld) instead of "
+                                        "(%ld,%ld,%ld)\n",
                               (long)reply->lock_desc.l_resource.lr_name.name[0],
-                               (long)lock->l_resource->lr_name.name[0]);
+                              (long)reply->lock_desc.l_resource.lr_name.name[1],
+                              (long)reply->lock_desc.l_resource.lr_name.name[2],
+                              (long)lock->l_resource->lr_name.name[0],
+                              (long)lock->l_resource->lr_name.name[1],
+                              (long)lock->l_resource->lr_name.name[2]);
 
                         ldlm_lock_change_resource(ns, lock,
-                                           reply->lock_desc.l_resource.lr_name);
+                                          &reply->lock_desc.l_resource.lr_name);
                         if (lock->l_resource == NULL) {
                                 LBUG();
                                 GOTO(cleanup, rc = -ENOMEM);
@@ -461,8 +469,9 @@ cleanup:
  * request was created in ldlm_cli_enqueue and it is the async request,
  * pass it to the caller in @reqp. */
 int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
-                     struct ldlm_res_id res_id, ldlm_type_t type,
-                     ldlm_policy_data_t *policy, ldlm_mode_t mode, int *flags,
+                     const struct ldlm_res_id *res_id,
+                     ldlm_type_t type, ldlm_policy_data_t *policy,
+                     ldlm_mode_t mode, int *flags,
                      ldlm_blocking_callback blocking,
                      ldlm_completion_callback completion,
                      ldlm_glimpse_callback glimpse,
@@ -700,7 +709,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
         if (lock == NULL)
                 RETURN(0);
-
+        
         if (lock->l_conn_export) {
                 int local_only;
                 struct obd_import *imp;
@@ -713,7 +722,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                               (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
                 ldlm_cancel_callback(lock);
                 unlock_res_and_lock(lock);
-
+                
                 if (local_only) {
                         CDEBUG(D_INFO, "not sending request (at caller's "
                                "instruction)\n");
@@ -866,8 +875,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
 }
 
 static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
-                                           struct ldlm_res_id res_id, int flags,
-                                           void *opaque)
+                                           const struct ldlm_res_id *res_id,
+                                           int flags, void *opaque)
 {
         struct list_head *tmp, *next, list = CFS_LIST_HEAD_INIT(list);
         struct ldlm_resource *res;
@@ -877,7 +886,7 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
         if (res == NULL) {
                 /* This is not a problem. */
-                CDEBUG(D_INFO, "No resource "LPU64"\n", res_id.name[0]);
+                CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
                 RETURN(0);
         }
 
@@ -950,7 +959,8 @@ static inline int have_no_nsresource(struct ldlm_namespace *ns)
  * to notify the server.
  * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
-                           struct ldlm_res_id *res_id, int flags, void *opaque)
+                           const struct ldlm_res_id *res_id,
+                           int flags, void *opaque)
 {
         int i;
         ENTRY;
@@ -959,7 +969,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                 RETURN(ELDLM_OK);
 
         if (res_id)
-                RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
+                RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, flags,
                                                        opaque));
 
         spin_lock(&ns->ns_hash_lock);
@@ -974,7 +984,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                         ldlm_resource_getref(res);
                         spin_unlock(&ns->ns_hash_lock);
 
-                        rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
+                        rc = ldlm_cli_cancel_unused_resource(ns, &res->lr_name,
                                                              flags, opaque);
 
                         if (rc)
@@ -993,7 +1003,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
 
 /* join/split resource locks to/from lru list */
 int ldlm_cli_join_lru(struct ldlm_namespace *ns,
-                      struct ldlm_res_id *res_id, int join)
+                      const struct ldlm_res_id *res_id, int join)
 {
         struct ldlm_resource *res;
         struct ldlm_lock *lock, *n;
@@ -1002,7 +1012,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns,
 
         LASSERT(ns->ns_client == LDLM_NAMESPACE_CLIENT);
 
-        res = ldlm_resource_get(ns, NULL, *res_id, LDLM_EXTENT, 0);
+        res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
         if (res == NULL)
                 RETURN(count);
         LASSERT(res->lr_type == LDLM_EXTENT);
@@ -1138,7 +1148,8 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
 }
 
 /* non-blocking function to manipulate a lock whose cb_data is being put away.*/
-void ldlm_resource_iterate(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+void ldlm_resource_iterate(struct ldlm_namespace *ns,
+                           const struct ldlm_res_id *res_id,
                            ldlm_iterator_t iter, void *data)
 {
         struct ldlm_resource *res;
@@ -1149,7 +1160,7 @@ void ldlm_resource_iterate(struct ldlm_namespace *ns, struct ldlm_res_id *res_id
                 LBUG();
         }
 
-        res = ldlm_resource_get(ns, NULL, *res_id, 0, 0);
+        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
         if (res == NULL) {
                 EXIT;
                 return;
@@ -1268,6 +1279,11 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
         }
         ptlrpc_req_set_repsize(req, buffers, size);
+        /* notify the server we've replayed all requests.
+         * also, we mark the request to be put on a dedicated
+         * queue to be processed after all request replayes.
+         * bug 6063 */
+        lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
 
         LDLM_DEBUG(lock, "replaying lock:");