Whamcloud - gitweb
LU-10175 ldlm: handle lock converts in cancel handler 14/32314/5
authorMikhail Pershin <mike.pershin@intel.com>
Mon, 7 May 2018 20:36:55 +0000 (23:36 +0300)
committerOleg Drokin <green@whamcloud.com>
Tue, 3 Jul 2018 18:05:08 +0000 (18:05 +0000)
- Use cancel portals and high-priority handling for lock
  converts. Update ldlm_cancel_handler to understand
  LDLM_CONVERT RPC for that.
- Use ns_dirty_age_limit for lock convert - don't convert too old
  locks.
- Check for empty converts and skip such

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: I767626acd974ad88bbbf0bb3b0a46744c45b7897
Reviewed-on: https://review.whamcloud.com/32314
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_export.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/llite_lib.c
lustre/llite/namei.c

index 6c5a08a..5ead593 100644 (file)
@@ -450,6 +450,11 @@ static inline int exp_connect_flr(struct obd_export *exp)
        return !!(exp_connect_flags2(exp) & OBD_CONNECT2_FLR);
 }
 
+static inline int exp_connect_lock_convert(struct obd_export *exp)
+{
+       return !!(exp_connect_flags2(exp) & OBD_CONNECT2_LOCK_CONVERT);
+}
+
 extern struct obd_export *class_conn2export(struct lustre_handle *conn);
 extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
 
index 1c821a0..7999816 100644 (file)
@@ -859,7 +859,8 @@ struct ptlrpc_body_v2 {
                                OBD_CONNECT_GRANT_PARAM | \
                                OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2)
 
-#define MDT_CONNECT_SUPPORTED2 (OBD_CONNECT2_FILE_SECCTX | OBD_CONNECT2_FLR)
+#define MDT_CONNECT_SUPPORTED2 (OBD_CONNECT2_FILE_SECCTX | OBD_CONNECT2_FLR | \
++                               OBD_CONNECT2_LOCK_CONVERT)
 
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
index 626c624..c5d5d5a 100644 (file)
@@ -325,7 +325,7 @@ int ldlm_inodebits_drop(struct ldlm_lock *lock, __u64 to_drop)
 
        /* Just return if there are no conflicting bits */
        if ((lock->l_policy_data.l_inodebits.bits & to_drop) == 0) {
-               LDLM_WARN(lock, "try to drop unset bits %#llx/%#llx\n",
+               LDLM_WARN(lock, "try to drop unset bits %#llx/%#llx",
                          lock->l_policy_data.l_inodebits.bits, to_drop);
                /* nothing to do */
                RETURN(0);
@@ -356,13 +356,19 @@ int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits)
 
        ldlm_lock2handle(lock, &lockh);
        lock_res_and_lock(lock);
-       /* check if all bits are cancelled */
+       /* check if all bits are blocked */
        if (!(lock->l_policy_data.l_inodebits.bits & ~drop_bits)) {
                unlock_res_and_lock(lock);
                /* return error to continue with cancel */
                GOTO(exit, rc = -EINVAL);
        }
 
+       /* check if no common bits, consider this as successful convert */
+       if (!(lock->l_policy_data.l_inodebits.bits & drop_bits)) {
+               unlock_res_and_lock(lock);
+               GOTO(exit, rc = 0);
+       }
+
        /* check if there is race with cancel */
        if (ldlm_is_canceling(lock) || ldlm_is_cancel(lock)) {
                unlock_res_and_lock(lock);
@@ -408,9 +414,11 @@ int ldlm_cli_dropbits(struct ldlm_lock *lock, __u64 drop_bits)
        rc = ldlm_cli_convert(lock, &flags);
        if (rc) {
                lock_res_and_lock(lock);
-               ldlm_clear_converting(lock);
-               ldlm_set_cbpending(lock);
-               ldlm_set_bl_ast(lock);
+               if (ldlm_is_converting(lock)) {
+                       ldlm_clear_converting(lock);
+                       ldlm_set_cbpending(lock);
+                       ldlm_set_bl_ast(lock);
+               }
                unlock_res_and_lock(lock);
                GOTO(exit, rc);
        }
index 92ee0c8..3273fdc 100644 (file)
@@ -1494,16 +1494,16 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
 
                if (ldlm_is_cancel(lock)) {
                        LDLM_ERROR(lock, "convert on canceled lock!");
-                       req->rq_status = LUSTRE_EINVAL;
+                       rc = ELDLM_NO_LOCK_DATA;
                } else if (dlm_req->lock_desc.l_req_mode !=
                           lock->l_granted_mode) {
                        LDLM_ERROR(lock, "lock mode differs!");
-                       req->rq_status = LUSTRE_EINVAL;
+                       rc = ELDLM_NO_LOCK_DATA;
                } else if (bits == new) {
                        /* This can be valid situation if CONVERT RPCs are
-                        * re-ordered. Just finish silently*/
+                        * re-ordered. Just finish silently */
                        LDLM_DEBUG(lock, "lock is converted already!");
-                       req->rq_status = 0;
+                       rc = ELDLM_OK;
                } else {
                        lock_res_and_lock(lock);
                        if (ldlm_is_waited(lock))
@@ -1515,21 +1515,28 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
                        lock->l_bl_ast_run = 0;
                        ldlm_clear_ast_sent(lock);
                        unlock_res_and_lock(lock);
-                       req->rq_status = 0;
 
+                       ldlm_reprocess_all(lock->l_resource);
+                       rc = ELDLM_OK;
+               }
+
+               if (rc == ELDLM_OK) {
                        dlm_rep->lock_handle = lock->l_remote_handle;
                        ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
                                        &dlm_rep->lock_desc.l_policy_data);
-
-                       ldlm_reprocess_all(lock->l_resource);
                }
-               LDLM_DEBUG(lock, "server-side convert handler END");
+
+               LDLM_DEBUG(lock, "server-side convert handler END, rc = %d",
+                          rc);
                LDLM_LOCK_PUT(lock);
        } else {
-               req->rq_status = LUSTRE_EINVAL;
-               LDLM_DEBUG_NOLOCK("server-side convert handler END");
+               rc = ELDLM_NO_LOCK_DATA;
+               LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d",
+                                 rc);
        }
 
+       req->rq_status = rc;
+
        RETURN(0);
 }
 
@@ -2234,145 +2241,163 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
  */
 static int ldlm_cancel_handler(struct ptlrpc_request *req)
 {
-        int rc;
-        ENTRY;
+       int rc;
 
-        /* Requests arrive in sender's byte order.  The ptlrpc service
-         * handler has already checked and, if necessary, byte-swapped the
-         * incoming request message body, but I am responsible for the
-         * message buffers. */
+       ENTRY;
 
-        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+       /* Requests arrive in sender's byte order.  The ptlrpc service
+        * handler has already checked and, if necessary, byte-swapped the
+        * incoming request message body, but I am responsible for the
+        * message buffers. */
 
-        if (req->rq_export == NULL) {
-                struct ldlm_request *dlm_req;
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
 
-                CERROR("%s from %s arrived at %lu with bad export cookie "
-                      "%llu\n",
-                       ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
-                       libcfs_nid2str(req->rq_peer.nid),
-                       req->rq_arrival_time.tv_sec,
-                       lustre_msg_get_handle(req->rq_reqmsg)->cookie);
-
-                if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
-                        req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
-                        dlm_req = req_capsule_client_get(&req->rq_pill,
-                                                         &RMF_DLM_REQ);
-                        if (dlm_req != NULL)
-                                ldlm_lock_dump_handle(D_ERROR,
-                                                      &dlm_req->lock_handle[0]);
-                }
-                ldlm_callback_reply(req, -ENOTCONN);
-                RETURN(0);
-        }
+       if (req->rq_export == NULL) {
+               struct ldlm_request *dlm_req;
 
-        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+               CERROR("%s from %s arrived at %lu with bad export cookie "
+                      "%llu\n",
+                      ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
+                      libcfs_nid2str(req->rq_peer.nid),
+                      req->rq_arrival_time.tv_sec,
+                      lustre_msg_get_handle(req->rq_reqmsg)->cookie);
+
+               if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
+                       req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+                       dlm_req = req_capsule_client_get(&req->rq_pill,
+                                                        &RMF_DLM_REQ);
+                       if (dlm_req != NULL)
+                               ldlm_lock_dump_handle(D_ERROR,
+                                                     &dlm_req->lock_handle[0]);
+               }
+               ldlm_callback_reply(req, -ENOTCONN);
+               RETURN(0);
+       }
 
-        /* XXX FIXME move this back to mds/handler.c, bug 249 */
-        case LDLM_CANCEL:
-                req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
-                CDEBUG(D_INODE, "cancel\n");
+       switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+       /* XXX FIXME move this back to mds/handler.c, bug 249 */
+       case LDLM_CANCEL:
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+               CDEBUG(D_INODE, "cancel\n");
                if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
                    CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
                    CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
                        RETURN(0);
-                rc = ldlm_handle_cancel(req);
-                if (rc)
-                        break;
-                RETURN(0);
-        default:
-                CERROR("invalid opcode %d\n",
-                       lustre_msg_get_opc(req->rq_reqmsg));
-                req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
-                ldlm_callback_reply(req, -EINVAL);
-        }
+               rc = ldlm_handle_cancel(req);
+               break;
+       case LDLM_CONVERT:
+       {
+               struct ldlm_request *dlm_req;
 
-        RETURN(0);
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
+               CDEBUG(D_INODE, "convert\n");
+
+               dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+               if (dlm_req == NULL) {
+                       CDEBUG(D_INFO, "bad request buffer for cancel\n");
+                       rc = ldlm_callback_reply(req, -EPROTO);
+               } else {
+                       req->rq_status = ldlm_handle_convert0(req, dlm_req);
+                       rc = ptlrpc_reply(req);
+               }
+               break;
+       }
+       default:
+               CERROR("invalid opcode %d\n",
+                      lustre_msg_get_opc(req->rq_reqmsg));
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+               rc = ldlm_callback_reply(req, -EINVAL);
+       }
+
+       RETURN(rc);
 }
 
 static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
-                                        struct ldlm_lock *lock)
+                                       struct ldlm_lock *lock)
 {
-        struct ldlm_request *dlm_req;
-        struct lustre_handle lockh;
-        int rc = 0;
-        int i;
-        ENTRY;
-
-        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
-        if (dlm_req == NULL)
-                RETURN(0);
+       struct ldlm_request *dlm_req;
+       struct lustre_handle lockh;
+       int rc = 0;
+       int i;
 
-        ldlm_lock2handle(lock, &lockh);
-        for (i = 0; i < dlm_req->lock_count; i++) {
-                if (lustre_handle_equal(&dlm_req->lock_handle[i],
-                                        &lockh)) {
-                        DEBUG_REQ(D_RPCTRACE, req,
-                                 "Prio raised by lock %#llx.", lockh.cookie);
+       ENTRY;
 
-                        rc = 1;
-                        break;
-                }
-        }
+       dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+       if (dlm_req == NULL)
+               RETURN(0);
 
-        RETURN(rc);
+       ldlm_lock2handle(lock, &lockh);
+       for (i = 0; i < dlm_req->lock_count; i++) {
+               if (lustre_handle_equal(&dlm_req->lock_handle[i],
+                                       &lockh)) {
+                       DEBUG_REQ(D_RPCTRACE, req,
+                                 "Prio raised by lock %#llx.", lockh.cookie);
+                       rc = 1;
+                       break;
+               }
+       }
 
+       RETURN(rc);
 }
 
 static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
 {
-        struct ldlm_request *dlm_req;
-        int rc = 0;
-        int i;
-        ENTRY;
+       struct ldlm_request *dlm_req;
+       int rc = 0;
+       int i;
 
-        /* no prolong in recovery */
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
-                RETURN(0);
+       ENTRY;
 
-        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
-        if (dlm_req == NULL)
-                RETURN(-EFAULT);
+       /* no prolong in recovery */
+       if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+               RETURN(0);
 
-        for (i = 0; i < dlm_req->lock_count; i++) {
-                struct ldlm_lock *lock;
+       dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+       if (dlm_req == NULL)
+               RETURN(-EFAULT);
 
-                lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
-                if (lock == NULL)
-                        continue;
+       for (i = 0; i < dlm_req->lock_count; i++) {
+               struct ldlm_lock *lock;
+
+               lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
+               if (lock == NULL)
+                       continue;
 
                rc = ldlm_is_ast_sent(lock) ? 1 : 0;
-                if (rc)
-                        LDLM_DEBUG(lock, "hpreq cancel lock");
-                LDLM_LOCK_PUT(lock);
+               if (rc)
+                       LDLM_DEBUG(lock, "hpreq cancel/convert lock");
+               LDLM_LOCK_PUT(lock);
 
-                if (rc)
-                        break;
-        }
+               if (rc)
+                       break;
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
-        .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
+       .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
        .hpreq_check      = ldlm_cancel_hpreq_check,
        .hpreq_fini       = NULL,
 };
 
 static int ldlm_hpreq_handler(struct ptlrpc_request *req)
 {
-        ENTRY;
+       ENTRY;
 
-        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
 
-        if (req->rq_export == NULL)
-                RETURN(0);
+       if (req->rq_export == NULL)
+               RETURN(0);
 
-        if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
-                req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
-                req->rq_ops = &ldlm_cancel_hpreq_ops;
-        }
-        RETURN(0);
+       if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+               req->rq_ops = &ldlm_cancel_hpreq_ops;
+       } else if (LDLM_CONVERT == lustre_msg_get_opc(req->rq_reqmsg)) {
+               req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
+               req->rq_ops = &ldlm_cancel_hpreq_ops;
+       }
+       RETURN(0);
 }
 
 static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
index 07520c3..36bc26e 100644 (file)
@@ -1073,7 +1073,7 @@ static int lock_convert_interpret(const struct lu_env *env,
                           aa->lock_handle.cookie, reply->lock_handle.cookie,
                           req->rq_export->exp_client_uuid.uuid,
                           libcfs_id2str(req->rq_peer));
-               GOTO(out, rc = -ESTALE);
+               GOTO(out, rc = ELDLM_NO_LOCK_DATA);
        }
 
        lock_res_and_lock(lock);
@@ -1122,15 +1122,29 @@ static int lock_convert_interpret(const struct lu_env *env,
        unlock_res_and_lock(lock);
 out:
        if (rc) {
+               int flag;
+
                lock_res_and_lock(lock);
                if (ldlm_is_converting(lock)) {
                        ldlm_clear_converting(lock);
                        ldlm_set_cbpending(lock);
                        ldlm_set_bl_ast(lock);
+                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
                }
                unlock_res_and_lock(lock);
-       }
 
+               /* fallback to normal lock cancel. If rc means there is no
+                * valid lock on server, do only local cancel */
+               if (rc == ELDLM_NO_LOCK_DATA)
+                       flag = LCF_LOCAL;
+               else
+                       flag = LCF_ASYNC;
+
+               rc = ldlm_cli_cancel(&aa->lock_handle, flag);
+               if (rc < 0)
+                       LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
+                                  rc);
+       }
        LDLM_LOCK_PUT(lock);
        RETURN(rc);
 }
@@ -1161,6 +1175,15 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
                RETURN(-EINVAL);
        }
 
+       /* this is better to check earlier and it is done so already,
+        * but this check is kept too as final one to issue an error
+        * if any new code will miss such check.
+        */
+       if (!exp_connect_lock_convert(exp)) {
+               LDLM_ERROR(lock, "server doesn't support lock convert\n");
+               RETURN(-EPROTO);
+       }
+
        if (lock->l_resource->lr_type != LDLM_IBITS) {
                LDLM_ERROR(lock, "convert works with IBITS locks only.");
                RETURN(-EINVAL);
@@ -1189,13 +1212,12 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
 
        ptlrpc_request_set_replen(req);
 
-       /* That could be useful to use cancel portals for convert as well
-        * as high-priority handling. This will require changes in
-        * ldlm_cancel_handler to understand convert RPC as well.
-        *
-        * req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
-        * req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+       /*
+        * Use cancel portals for convert as well as high-priority handling.
         */
+       req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
+       req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+
        ptlrpc_at_set_req_timeout(req);
 
        if (exp->exp_obd->obd_svc_stats != NULL)
index a56ffb5..5d06edc 100644 (file)
@@ -224,7 +224,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  OBD_CONNECT_GRANT_PARAM |
                                  OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2;
 
-       data->ocd_connect_flags2 = OBD_CONNECT2_FLR;
+       data->ocd_connect_flags2 = OBD_CONNECT2_FLR | OBD_CONNECT2_LOCK_CONVERT;
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
index 0e098a3..8118a92 100644 (file)
@@ -374,11 +374,16 @@ void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel)
  * that convert is really needed. */
 int ll_md_need_convert(struct ldlm_lock *lock)
 {
+       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
        struct inode *inode;
        __u64 wanted = lock->l_policy_data.l_inodebits.cancel_bits;
        __u64 bits = lock->l_policy_data.l_inodebits.bits & ~wanted;
        enum ldlm_mode mode = LCK_MINMODE;
 
+       if (!lock->l_conn_export ||
+           !exp_connect_lock_convert(lock->l_conn_export))
+               return 0;
+
        if (!wanted || !bits || ldlm_is_cancel(lock))
                return 0;
 
@@ -411,7 +416,7 @@ int ll_md_need_convert(struct ldlm_lock *lock)
        lock_res_and_lock(lock);
        if (ktime_after(ktime_get(),
                        ktime_add(lock->l_last_used,
-                                 ktime_set(10, 0)))) {
+                                 ktime_set(ns->ns_dirty_age_limit, 0)))) {
                unlock_res_and_lock(lock);
                return 0;
        }