Whamcloud - gitweb
LU-10175 ldlm: handle lock converts in cancel handler
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 92ee0c8..3273fdc 100644 (file)
@@ -1494,16 +1494,16 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
 
                if (ldlm_is_cancel(lock)) {
                        LDLM_ERROR(lock, "convert on canceled lock!");
-                       req->rq_status = LUSTRE_EINVAL;
+                       rc = ELDLM_NO_LOCK_DATA;
                } else if (dlm_req->lock_desc.l_req_mode !=
                           lock->l_granted_mode) {
                        LDLM_ERROR(lock, "lock mode differs!");
-                       req->rq_status = LUSTRE_EINVAL;
+                       rc = ELDLM_NO_LOCK_DATA;
                } else if (bits == new) {
                        /* This can be valid situation if CONVERT RPCs are
-                        * re-ordered. Just finish silently*/
+                        * re-ordered. Just finish silently */
                        LDLM_DEBUG(lock, "lock is converted already!");
-                       req->rq_status = 0;
+                       rc = ELDLM_OK;
                } else {
                        lock_res_and_lock(lock);
                        if (ldlm_is_waited(lock))
@@ -1515,21 +1515,28 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
                        lock->l_bl_ast_run = 0;
                        ldlm_clear_ast_sent(lock);
                        unlock_res_and_lock(lock);
-                       req->rq_status = 0;
 
+                       ldlm_reprocess_all(lock->l_resource);
+                       rc = ELDLM_OK;
+               }
+
+               if (rc == ELDLM_OK) {
                        dlm_rep->lock_handle = lock->l_remote_handle;
                        ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
                                        &dlm_rep->lock_desc.l_policy_data);
-
-                       ldlm_reprocess_all(lock->l_resource);
                }
-               LDLM_DEBUG(lock, "server-side convert handler END");
+
+               LDLM_DEBUG(lock, "server-side convert handler END, rc = %d",
+                          rc);
                LDLM_LOCK_PUT(lock);
        } else {
-               req->rq_status = LUSTRE_EINVAL;
-               LDLM_DEBUG_NOLOCK("server-side convert handler END");
+               rc = ELDLM_NO_LOCK_DATA;
+               LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d",
+                                 rc);
        }
 
+       req->rq_status = rc;
+
        RETURN(0);
 }
 
@@ -2234,145 +2241,163 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
  */
 static int ldlm_cancel_handler(struct ptlrpc_request *req)
 {
-        int rc;
-        ENTRY;
+       int rc;
 
-        /* Requests arrive in sender's byte order.  The ptlrpc service
-         * handler has already checked and, if necessary, byte-swapped the
-         * incoming request message body, but I am responsible for the
-         * message buffers. */
+       ENTRY;
 
-        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+       /* Requests arrive in sender's byte order.  The ptlrpc service
+        * handler has already checked and, if necessary, byte-swapped the
+        * incoming request message body, but I am responsible for the
+        * message buffers. */
 
-        if (req->rq_export == NULL) {
-                struct ldlm_request *dlm_req;
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
 
-                CERROR("%s from %s arrived at %lu with bad export cookie "
-                      "%llu\n",
-                       ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
-                       libcfs_nid2str(req->rq_peer.nid),
-                       req->rq_arrival_time.tv_sec,
-                       lustre_msg_get_handle(req->rq_reqmsg)->cookie);
-
-                if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
-                        req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
-                        dlm_req = req_capsule_client_get(&req->rq_pill,
-                                                         &RMF_DLM_REQ);
-                        if (dlm_req != NULL)
-                                ldlm_lock_dump_handle(D_ERROR,
-                                                      &dlm_req->lock_handle[0]);
-                }
-                ldlm_callback_reply(req, -ENOTCONN);
-                RETURN(0);
-        }
+       if (req->rq_export == NULL) {
+               struct ldlm_request *dlm_req;
 
-        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+               CERROR("%s from %s arrived at %lu with bad export cookie "
+                      "%llu\n",
+                      ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
+                      libcfs_nid2str(req->rq_peer.nid),
+                      req->rq_arrival_time.tv_sec,
+                      lustre_msg_get_handle(req->rq_reqmsg)->cookie);
+
+               if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
+                       req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+                       dlm_req = req_capsule_client_get(&req->rq_pill,
+                                                        &RMF_DLM_REQ);
+                       if (dlm_req != NULL)
+                               ldlm_lock_dump_handle(D_ERROR,
+                                                     &dlm_req->lock_handle[0]);
+               }
+               ldlm_callback_reply(req, -ENOTCONN);
+               RETURN(0);
+       }
 
-        /* XXX FIXME move this back to mds/handler.c, bug 249 */
-        case LDLM_CANCEL:
-                req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
-                CDEBUG(D_INODE, "cancel\n");
+       switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+       /* XXX FIXME move this back to mds/handler.c, bug 249 */
+       case LDLM_CANCEL:
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+               CDEBUG(D_INODE, "cancel\n");
                if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
                    CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
                    CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
                        RETURN(0);
-                rc = ldlm_handle_cancel(req);
-                if (rc)
-                        break;
-                RETURN(0);
-        default:
-                CERROR("invalid opcode %d\n",
-                       lustre_msg_get_opc(req->rq_reqmsg));
-                req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
-                ldlm_callback_reply(req, -EINVAL);
-        }
+               rc = ldlm_handle_cancel(req);
+               break;
+       case LDLM_CONVERT:
+       {
+               struct ldlm_request *dlm_req;
 
-        RETURN(0);
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
+               CDEBUG(D_INODE, "convert\n");
+
+               dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+               if (dlm_req == NULL) {
+                       CDEBUG(D_INFO, "bad request buffer for cancel\n");
+                       rc = ldlm_callback_reply(req, -EPROTO);
+               } else {
+                       req->rq_status = ldlm_handle_convert0(req, dlm_req);
+                       rc = ptlrpc_reply(req);
+               }
+               break;
+       }
+       default:
+               CERROR("invalid opcode %d\n",
+                      lustre_msg_get_opc(req->rq_reqmsg));
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+               rc = ldlm_callback_reply(req, -EINVAL);
+       }
+
+       RETURN(rc);
 }
 
 static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
-                                        struct ldlm_lock *lock)
+                                       struct ldlm_lock *lock)
 {
-        struct ldlm_request *dlm_req;
-        struct lustre_handle lockh;
-        int rc = 0;
-        int i;
-        ENTRY;
-
-        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
-        if (dlm_req == NULL)
-                RETURN(0);
+       struct ldlm_request *dlm_req;
+       struct lustre_handle lockh;
+       int rc = 0;
+       int i;
 
-        ldlm_lock2handle(lock, &lockh);
-        for (i = 0; i < dlm_req->lock_count; i++) {
-                if (lustre_handle_equal(&dlm_req->lock_handle[i],
-                                        &lockh)) {
-                        DEBUG_REQ(D_RPCTRACE, req,
-                                 "Prio raised by lock %#llx.", lockh.cookie);
+       ENTRY;
 
-                        rc = 1;
-                        break;
-                }
-        }
+       dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+       if (dlm_req == NULL)
+               RETURN(0);
 
-        RETURN(rc);
+       ldlm_lock2handle(lock, &lockh);
+       for (i = 0; i < dlm_req->lock_count; i++) {
+               if (lustre_handle_equal(&dlm_req->lock_handle[i],
+                                       &lockh)) {
+                       DEBUG_REQ(D_RPCTRACE, req,
+                                 "Prio raised by lock %#llx.", lockh.cookie);
+                       rc = 1;
+                       break;
+               }
+       }
 
+       RETURN(rc);
 }
 
 static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
 {
-        struct ldlm_request *dlm_req;
-        int rc = 0;
-        int i;
-        ENTRY;
+       struct ldlm_request *dlm_req;
+       int rc = 0;
+       int i;
 
-        /* no prolong in recovery */
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
-                RETURN(0);
+       ENTRY;
 
-        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
-        if (dlm_req == NULL)
-                RETURN(-EFAULT);
+       /* no prolong in recovery */
+       if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+               RETURN(0);
 
-        for (i = 0; i < dlm_req->lock_count; i++) {
-                struct ldlm_lock *lock;
+       dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+       if (dlm_req == NULL)
+               RETURN(-EFAULT);
 
-                lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
-                if (lock == NULL)
-                        continue;
+       for (i = 0; i < dlm_req->lock_count; i++) {
+               struct ldlm_lock *lock;
+
+               lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
+               if (lock == NULL)
+                       continue;
 
                rc = ldlm_is_ast_sent(lock) ? 1 : 0;
-                if (rc)
-                        LDLM_DEBUG(lock, "hpreq cancel lock");
-                LDLM_LOCK_PUT(lock);
+               if (rc)
+                       LDLM_DEBUG(lock, "hpreq cancel/convert lock");
+               LDLM_LOCK_PUT(lock);
 
-                if (rc)
-                        break;
-        }
+               if (rc)
+                       break;
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
-        .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
+       .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
        .hpreq_check      = ldlm_cancel_hpreq_check,
        .hpreq_fini       = NULL,
 };
 
 static int ldlm_hpreq_handler(struct ptlrpc_request *req)
 {
-        ENTRY;
+       ENTRY;
 
-        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
 
-        if (req->rq_export == NULL)
-                RETURN(0);
+       if (req->rq_export == NULL)
+               RETURN(0);
 
-        if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
-                req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
-                req->rq_ops = &ldlm_cancel_hpreq_ops;
-        }
-        RETURN(0);
+       if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+               req->rq_ops = &ldlm_cancel_hpreq_ops;
+       } else if (LDLM_CONVERT == lustre_msg_get_opc(req->rq_reqmsg)) {
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
+               req->rq_ops = &ldlm_cancel_hpreq_ops;
+       }
+       RETURN(0);
 }
 
 static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,