Whamcloud - gitweb
LU-398 ptlrpc: NRS framework follow-up patch
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 4010c45..31925ff 100644 (file)
@@ -124,7 +124,7 @@ struct ldlm_bl_work_item {
         cfs_list_t              blwi_head;
         int                     blwi_count;
        struct completion        blwi_comp;
-        int                     blwi_mode;
+       ldlm_cancel_flags_t     blwi_flags;
         int                     blwi_mem_pressure;
 };
 
@@ -810,8 +810,8 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
                                rq_exp_list) {
                /* Do not process requests that were not yet added to there
                 * incoming queue or were already removed from there for
-                * processing. We evaluate ptlrpc_request_reorderable() without
-                * holding svcpt->scp_req_lock, and then redo the checks with
+                * processing. We evaluate ptlrpc_nrs_req_can_move() without
+                * holding svcpt->scp_req_lock, and then redo the check with
                 * the lock held once we need to obtain a reliable result.
                 */
                if (ptlrpc_nrs_req_can_move(req) &&
@@ -976,8 +976,20 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
 
                lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
-               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
-                                  lvb_len, RCL_CLIENT);
+               if (lvb_len < 0) {
+                       /* We still need to send the RPC to wake up the blocked
+                        * enqueue thread on the client.
+                        *
+                        * Consider old client, there is no better way to notify
+                        * the failure, just zero-sized the LVB, then the client
+                        * will fail out as "-EPROTO". */
+                       req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0,
+                                          RCL_CLIENT);
+                       instant_cancel = 1;
+               } else {
+                       req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len,
+                                          RCL_CLIENT);
+               }
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
@@ -1038,7 +1050,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 
        rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
 
-        RETURN(rc);
+       RETURN(lvb_len < 0 ? lvb_len : rc);
 }
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 
@@ -1224,7 +1236,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                 GOTO(out, rc = -EFAULT);
         }
 
-        if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
+       if (exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) {
                 if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
                              LDLM_PLAIN)) {
                         DEBUG_REQ(D_ERROR, req,
@@ -1245,7 +1257,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
 
         /* INODEBITS_INTEROP: Perform conversion from plain lock to
          * inodebits lock if client does not support them. */
-        if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
+       if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) &&
             (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
                 dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
                 dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
@@ -1427,8 +1439,12 @@ existing_lock:
                                buflen = req_capsule_get_size(&req->rq_pill,
                                                &RMF_DLM_LVB, RCL_SERVER);
                                buflen = ldlm_lvbo_fill(lock, buf, buflen);
-                               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
-                                                  buflen, RCL_SERVER);
+                               if (buflen >= 0)
+                                       req_capsule_shrink(&req->rq_pill,
+                                                          &RMF_DLM_LVB,
+                                                          buflen, RCL_SERVER);
+                               else
+                                       rc = buflen;
                        }
                 } else {
                         lock_res_and_lock(lock);
@@ -1898,7 +1914,8 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 }
 
 #ifdef __KERNEL__
-static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
+static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
+                              ldlm_cancel_flags_t cancel_flags)
 {
        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
        ENTRY;
@@ -1916,20 +1933,20 @@ static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
 
        cfs_waitq_signal(&blp->blp_waitq);
 
-       /* can not use blwi->blwi_mode as blwi could be already freed in
-          LDLM_ASYNC mode */
-       if (mode == LDLM_SYNC)
+       /* can not check blwi->blwi_flags as blwi could be already freed in
+          LCF_ASYNC mode */
+       if (!(cancel_flags & LCF_ASYNC))
                wait_for_completion(&blwi->blwi_comp);
 
        RETURN(0);
 }
 
 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
-                             struct ldlm_namespace *ns,
-                             struct ldlm_lock_desc *ld,
-                             cfs_list_t *cancels, int count,
-                             struct ldlm_lock *lock,
-                             int mode)
+                            struct ldlm_namespace *ns,
+                            struct ldlm_lock_desc *ld,
+                            cfs_list_t *cancels, int count,
+                            struct ldlm_lock *lock,
+                            ldlm_cancel_flags_t cancel_flags)
 {
        init_completion(&blwi->blwi_comp);
         CFS_INIT_LIST_HEAD(&blwi->blwi_head);
@@ -1938,7 +1955,7 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi,
                 blwi->blwi_mem_pressure = 1;
 
         blwi->blwi_ns = ns;
-        blwi->blwi_mode = mode;
+       blwi->blwi_flags = cancel_flags;
         if (ld != NULL)
                 blwi->blwi_ld = *ld;
         if (count) {
@@ -1960,52 +1977,57 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi,
  * call ->l_blocking_ast itself.
  */
 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
-                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
-                             cfs_list_t *cancels, int count, int mode)
+                            struct ldlm_lock_desc *ld,
+                            struct ldlm_lock *lock,
+                            cfs_list_t *cancels, int count,
+                            ldlm_cancel_flags_t cancel_flags)
 {
-        ENTRY;
+       ENTRY;
 
-        if (cancels && count == 0)
-                RETURN(0);
+       if (cancels && count == 0)
+               RETURN(0);
 
-        if (mode == LDLM_SYNC) {
-                /* if it is synchronous call do minimum mem alloc, as it could
-                 * be triggered from kernel shrinker
-                 */
-                struct ldlm_bl_work_item blwi;
-                memset(&blwi, 0, sizeof(blwi));
-                init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC);
-                RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC));
-        } else {
-                struct ldlm_bl_work_item *blwi;
-                OBD_ALLOC(blwi, sizeof(*blwi));
-                if (blwi == NULL)
-                        RETURN(-ENOMEM);
-                init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC);
+       if (cancel_flags & LCF_ASYNC) {
+               struct ldlm_bl_work_item *blwi;
 
-                RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC));
-        }
+               OBD_ALLOC(blwi, sizeof(*blwi));
+               if (blwi == NULL)
+                       RETURN(-ENOMEM);
+               init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
+
+               RETURN(__ldlm_bl_to_thread(blwi, cancel_flags));
+       } else {
+               /* if it is synchronous call do minimum mem alloc, as it could
+                * be triggered from kernel shrinker
+                */
+               struct ldlm_bl_work_item blwi;
+
+               memset(&blwi, 0, sizeof(blwi));
+               init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
+               RETURN(__ldlm_bl_to_thread(&blwi, cancel_flags));
+       }
 }
 
 #endif
 
 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
-                           struct ldlm_lock *lock)
+                          struct ldlm_lock *lock)
 {
 #ifdef __KERNEL__
-        RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
+       return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
 #else
-        RETURN(-ENOSYS);
+       return -ENOSYS;
 #endif
 }
 
 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
-                           cfs_list_t *cancels, int count, int mode)
+                          cfs_list_t *cancels, int count,
+                          ldlm_cancel_flags_t cancel_flags)
 {
 #ifdef __KERNEL__
-        RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
+       return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
 #else
-        RETURN(-ENOSYS);
+       return -ENOSYS;
 #endif
 }
 
@@ -2199,7 +2221,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
          * which the server has already started a blocking callback on. */
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
-                rc = ldlm_cli_cancel(&dlm_req->lock_handle[0]);
+               rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
                 if (rc < 0)
                         CERROR("ldlm_cli_cancel: %d\n", rc);
         }
@@ -2631,7 +2653,8 @@ static int ldlm_bl_thread_main(void *arg)
                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
                                                            blwi->blwi_count,
                                                            LCF_BL_AST);
-                        ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0);
+                       ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+                                            blwi->blwi_flags);
                 } else {
                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
                                                 blwi->blwi_lock);
@@ -2639,9 +2662,9 @@ static int ldlm_bl_thread_main(void *arg)
                 if (blwi->blwi_mem_pressure)
                         cfs_memory_pressure_clr();
 
-                if (blwi->blwi_mode == LDLM_ASYNC)
-                        OBD_FREE(blwi, sizeof(*blwi));
-                else
+               if (blwi->blwi_flags & LCF_ASYNC)
+                       OBD_FREE(blwi, sizeof(*blwi));
+               else
                        complete(&blwi->blwi_comp);
         }
 
@@ -2819,7 +2842,7 @@ static int ldlm_setup(void)
                .psc_name               = "ldlm_cbd",
                .psc_watchdog_factor    = 2,
                .psc_buf                = {
-                       .bc_nbufs               = LDLM_NBUFS,
+                       .bc_nbufs               = LDLM_CLIENT_NBUFS,
                        .bc_buf_size            = LDLM_BUFSIZE,
                        .bc_req_max_size        = LDLM_MAXREQSIZE,
                        .bc_rep_max_size        = LDLM_MAXREPSIZE,
@@ -2858,7 +2881,7 @@ static int ldlm_setup(void)
                .psc_name               = "ldlm_canceld",
                .psc_watchdog_factor    = 6,
                .psc_buf                = {
-                       .bc_nbufs               = LDLM_NBUFS,
+                       .bc_nbufs               = LDLM_SERVER_NBUFS,
                        .bc_buf_size            = LDLM_BUFSIZE,
                        .bc_req_max_size        = LDLM_MAXREQSIZE,
                        .bc_rep_max_size        = LDLM_MAXREPSIZE,