Whamcloud - gitweb
LU-1710 lvb: variable sized LVB support
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 346a6e7..7a32e1d 100644 (file)
@@ -918,10 +918,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 
        /* server namespace, doesn't need lock */
        lvb_len = ldlm_lvbo_size(lock);
-       if (lvb_len > 0)
-                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
-                                      lvb_len);
-
+       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lvb_len);
         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
         if (rc) {
                 ptlrpc_request_free(req);
@@ -1235,8 +1232,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
         lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
                                 dlm_req->lock_desc.l_resource.lr_type,
                                 dlm_req->lock_desc.l_req_mode,
-                                cbs, NULL, 0);
-
+                               cbs, NULL, 0, LVB_T_NONE);
         if (!lock)
                 GOTO(out, rc = -ENOMEM);
 
@@ -1636,6 +1632,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
 {
        int lvb_len;
         CFS_LIST_HEAD(ast_list);
+       int rc = 0;
         ENTRY;
 
         LDLM_DEBUG(lock, "client completion callback handler START");
@@ -1652,28 +1649,34 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         }
 
        lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
-       if (lvb_len > 0) {
+       if (lvb_len < 0) {
+               LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
+               GOTO(out, rc = lvb_len);
+       } else if (lvb_len > 0) {
                if (lock->l_lvb_len > 0) {
                        /* for extent lock, lvb contains ost_lvb{}. */
                        LASSERT(lock->l_lvb_data != NULL);
-                       LASSERTF(lock->l_lvb_len == lvb_len,
-                               "preallocated %d, actual %d.\n",
-                               lock->l_lvb_len, lvb_len);
+
+                       if (unlikely(lock->l_lvb_len < lvb_len)) {
+                               LDLM_ERROR(lock, "Replied LVB is larger than "
+                                          "expectation, expected = %d, "
+                                          "replied = %d",
+                                          lock->l_lvb_len, lvb_len);
+                               GOTO(out, rc = -EINVAL);
+                       }
                } else { /* for layout lock, lvb has variable length */
                        void *lvb_data;
 
                        OBD_ALLOC(lvb_data, lvb_len);
-                       if (lvb_data == NULL)
-                               LDLM_ERROR(lock, "no memory.\n");
-
-                       lock_res_and_lock(lock);
                        if (lvb_data == NULL) {
-                               lock->l_flags |= LDLM_FL_FAILED;
-                       } else {
-                               LASSERT(lock->l_lvb_data == NULL);
-                               lock->l_lvb_data = lvb_data;
-                               lock->l_lvb_len = lvb_len;
+                               LDLM_ERROR(lock, "No memory.\n");
+                               GOTO(out, rc = -ENOMEM);
                        }
+
+                       lock_res_and_lock(lock);
+                       LASSERT(lock->l_lvb_data == NULL);
+                       lock->l_lvb_data = lvb_data;
+                       lock->l_lvb_len = lvb_len;
                        unlock_res_and_lock(lock);
                }
        }
@@ -1684,9 +1687,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 /* bug 11300: the lock has already been granted */
                 unlock_res_and_lock(lock);
                 LDLM_DEBUG(lock, "Double grant race happened");
-                LDLM_LOCK_RELEASE(lock);
-                EXIT;
-                return;
+               GOTO(out, rc = 0);
         }
 
         /* If we receive the completion AST before the actual enqueue returned,
@@ -1709,13 +1710,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                    &lock->l_resource->lr_name,
                    sizeof(lock->l_resource->lr_name)) != 0) {
                 unlock_res_and_lock(lock);
-                if (ldlm_lock_change_resource(ns, lock,
-                                &dlm_req->lock_desc.l_resource.lr_name) != 0) {
-                        LDLM_ERROR(lock, "Failed to allocate resource");
-                        LDLM_LOCK_RELEASE(lock);
-                        EXIT;
-                        return;
-                }
+               rc = ldlm_lock_change_resource(ns, lock,
+                               &dlm_req->lock_desc.l_resource.lr_name);
+               if (rc < 0) {
+                       LDLM_ERROR(lock, "Failed to allocate resource");
+                       GOTO(out, rc);
+               }
                 LDLM_DEBUG(lock, "completion AST, new resource");
                 CERROR("change resource!\n");
                 lock_res_and_lock(lock);
@@ -1729,17 +1729,14 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
         }
 
-        if (lock->l_lvb_len) {
-                if (req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
-                                         RCL_CLIENT) < lock->l_lvb_len) {
-                        LDLM_ERROR(lock, "completion AST did not contain "
-                                   "expected LVB!");
-                } else {
-                        void *lvb = req_capsule_client_get(&req->rq_pill,
-                                                           &RMF_DLM_LVB);
-                        memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
-                }
-        }
+       if (lock->l_lvb_len > 0) {
+               rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
+                                  lock->l_lvb_data, lvb_len);
+               if (rc < 0) {
+                       unlock_res_and_lock(lock);
+                       GOTO(out, rc);
+               }
+       }
 
         ldlm_grant_lock(lock, &ast_list);
         unlock_res_and_lock(lock);
@@ -1754,8 +1751,15 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
 
         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
                           lock);
-        LDLM_LOCK_RELEASE(lock);
-        EXIT;
+       GOTO(out, rc);
+
+out:
+       if (rc < 0) {
+               lock_res_and_lock(lock);
+               lock->l_flags |= LDLM_FL_FAILED;
+               unlock_res_and_lock(lock);
+       }
+       LDLM_LOCK_RELEASE(lock);
 }
 
 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,