Whamcloud - gitweb
LU-874 ptlrpc: handle in-flight hqreq correctly
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Wed, 25 Jan 2012 19:27:55 +0000 (11:27 -0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 13 Feb 2012 17:24:35 +0000 (12:24 -0500)
If there are in-flight requests pending, we shouldn't timeout the
covering dlm locks; neither should we remove the requests from export
exp_hp_rpcs list until the requests are handled.

In this patch, the following things are improved:
1. leave IO rpcs in export's hp list until they are handled;
2. using interval tree to find rpc overlapped locks;
3. refresh the lock again after IO rpcs are finished to leave a time
   window for clients to cancel covering dlm locks;
4. rework repbody in ost_handler.c so as to not modify original obdo
5. cleanup the code.

Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I33e2d113d7929a56389741c06dffb5efb6bf28a3
Reviewed-on: http://review.whamcloud.com/1918
Tested-by: Hudson
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: <alexander_boyko@xyratex.com>
12 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_dlm.h
lustre/include/lustre_export.h
lustre/include/lustre_net.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c
lustre/obdclass/genops.c
lustre/ost/ost_handler.c
lustre/ptlrpc/service.c

index a9e5968..bcef3b3 100644 (file)
@@ -2204,6 +2204,12 @@ struct ldlm_res_id {
 
 extern void lustre_swab_ldlm_res_id (struct ldlm_res_id *id);
 
+static inline int ldlm_res_eq(const struct ldlm_res_id *res0,
+                              const struct ldlm_res_id *res1)
+{
+        return !memcmp(res0, res1, sizeof(*res0));
+}
+
 /* lock types */
 typedef enum {
         LCK_MINMODE = 0,
@@ -2242,6 +2248,13 @@ static inline int ldlm_extent_overlap(struct ldlm_extent *ex1,
         return (ex1->start <= ex2->end) && (ex2->start <= ex1->end);
 }
 
+/* check if @ex1 contains @ex2 */
+static inline int ldlm_extent_contain(struct ldlm_extent *ex1,
+                                      struct ldlm_extent *ex2)
+{
+        return (ex1->start <= ex2->start) && (ex1->end >= ex2->end);
+}
+
 struct ldlm_inodebits {
         __u64 bits;
 };
index 1d4a2a7..3f6af4f 100644 (file)
@@ -795,6 +795,11 @@ struct ldlm_lock {
         /** referenced export object */
         struct obd_export    *l_exp_refs_target;
 #endif
+        /** export blocking dlm lock list, protected by
+         * l_export->exp_bl_list_lock.
+         * Lock order of waiting_lists_spinlock, exp_bl_list_lock and res lock
+         * is: res lock -> exp_bl_list_lock -> wanting_lists_spinlock. */
+        cfs_list_t            l_exp_list;
 };
 
 struct ldlm_resource {
index 2e0d95e..b8794ab 100644 (file)
@@ -224,8 +224,6 @@ struct obd_export {
         cfs_list_t                exp_req_replay_queue;
         /** protects exp_flags and exp_outstanding_replies */
         cfs_spinlock_t            exp_lock;
-        /** protects exp_queued_rpc */
-        cfs_spinlock_t            exp_rpc_lock;
         /** Compatibility flags for this export */
         __u64                     exp_connect_flags;
         enum obd_option           exp_flags;
@@ -246,13 +244,20 @@ struct obd_export {
                                   /* client timed out and tried to reconnect,
                                    * but couldn't because of active rpcs */
                                   exp_abort_active_req:1;
-        cfs_list_t                exp_queued_rpc;  /* RPC to be handled */
         /* also protected by exp_lock */
         enum lustre_sec_part      exp_sp_peer;
         struct sptlrpc_flavor     exp_flvr;             /* current */
         struct sptlrpc_flavor     exp_flvr_old[2];      /* about-to-expire */
         cfs_time_t                exp_flvr_expire[2];   /* seconds */
 
+        /** protects exp_hp_rpcs */
+        cfs_spinlock_t            exp_rpc_lock;
+        cfs_list_t                exp_hp_rpcs;  /* (potential) HP RPCs */
+
+        /** blocking dlm lock list, protected by exp_bl_list_lock */
+        cfs_list_t                exp_bl_list;
+        cfs_spinlock_t            exp_bl_list_lock;
+
         /** Target specific data */
         union {
                 struct tg_export_data     eu_target_data;
index 67ec415..9f520dc 100644 (file)
@@ -448,6 +448,10 @@ struct ptlrpc_hpreq_ops {
          * Check if the request is a high priority one.
          */
         int  (*hpreq_check)(struct ptlrpc_request *);
+        /**
+         * Called after the request has been handled.
+         */
+        void (*hpreq_fini)(struct ptlrpc_request *);
 };
 
 /**
index fe730f5..9117142 100644 (file)
@@ -81,11 +81,12 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
         }
 
         /* we need to ensure that the lock extent is properly aligned to what
-         * the client requested.  We align it to the lowest-common denominator
-         * of the clients requested lock start and end alignment. */
-        mask = 0x1000ULL;
+         * the client requested. Also we need to make sure it's also server
+         * page size aligned otherwise a server page can be covered by two
+         * write locks. */
+        mask = CFS_PAGE_SIZE;
         req_align = (req_end + 1) | req_start;
-        if (req_align != 0) {
+        if (req_align != 0 && (req_align & (mask - 1)) == 0) {
                 while ((req_align & mask) == 0)
                         mask <<= 1;
         }
index aa0faef..04e98de 100644 (file)
@@ -46,12 +46,6 @@ extern cfs_list_t ldlm_srv_namespace_list;
 extern cfs_semaphore_t ldlm_cli_namespace_lock;
 extern cfs_list_t ldlm_cli_namespace_list;
 
-static inline int ldlm_res_eq(const struct ldlm_res_id *res0,
-                       const struct ldlm_res_id *res1)
-{
-        return !memcmp(res0, res1, sizeof(*res0));
-}
-
 static inline cfs_atomic_t *ldlm_namespace_nr(ldlm_side_t client)
 {
         return client == LDLM_NAMESPACE_SERVER ?
index 8e34904..f72248a 100644 (file)
@@ -437,6 +437,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         lock->l_exp_refs_nr = 0;
         lock->l_exp_refs_target = NULL;
 #endif
+        CFS_INIT_LIST_HEAD(&lock->l_exp_list);
 
         RETURN(lock);
 }
index 5de675d..dfac3e8 100644 (file)
@@ -264,7 +264,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock)
                 return 0;
 
         cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
-        cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
+        cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
                                 rq_exp_list) {
                 if (req->rq_ops->hpreq_lock_match) {
                         match = req->rq_ops->hpreq_lock_match(req, lock);
@@ -444,12 +444,21 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
         }
 
         ret = __ldlm_add_waiting_lock(lock, timeout);
-        if (ret)
+        if (ret) {
                 /* grab ref on the lock if it has been added to the
                  * waiting list */
                 LDLM_LOCK_GET(lock);
+        }
         cfs_spin_unlock_bh(&waiting_locks_spinlock);
 
+        if (ret) {
+                cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+                if (cfs_list_empty(&lock->l_exp_list))
+                        cfs_list_add(&lock->l_exp_list,
+                                     &lock->l_export->exp_bl_list);
+                cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+        }
+
         LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
                    ret == 0 ? "not re-" : "", timeout,
                    AT_OFF ? "off" : "on");
@@ -504,10 +513,17 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
         cfs_spin_lock_bh(&waiting_locks_spinlock);
         ret = __ldlm_del_waiting_lock(lock);
         cfs_spin_unlock_bh(&waiting_locks_spinlock);
-        if (ret)
+
+        /* remove the lock out of export blocking list */
+        cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+        cfs_list_del_init(&lock->l_exp_list);
+        cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+
+        if (ret) {
                 /* release lock ref if it has indeed been removed
                  * from a list */
                 LDLM_LOCK_RELEASE(lock);
+        }
 
         LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
         return ret;
@@ -709,7 +725,7 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
         }
 
         cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
-        cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
+        cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
                                 rq_exp_list) {
                 /* Do not process requests that were not yet added to there
                  * incoming queue or were already removed from there for
index 4d6dab4..e67e39f 100644 (file)
@@ -1196,7 +1196,6 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, cfs_list_t *head,
 {
         check_res_locked(res);
 
-        ldlm_resource_dump(D_INFO, res);
         CDEBUG(D_OTHER, "About to add this lock:\n");
         ldlm_lock_dump(D_OTHER, lock, 0);
 
index 6921762..0dfaff3 100644 (file)
@@ -752,7 +752,7 @@ static void class_export_destroy(struct obd_export *exp)
         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
-        LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
+        LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
         obd_destroy_export(exp);
         class_decref(obd, "export", exp);
 
@@ -826,13 +826,15 @@ struct obd_export *class_new_export(struct obd_device *obd,
         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
-        CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
+        CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
         class_handle_hash(&export->exp_handle, export_handle_addref);
         export->exp_last_request_time = cfs_time_current_sec();
         cfs_spin_lock_init(&export->exp_lock);
         cfs_spin_lock_init(&export->exp_rpc_lock);
         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
+        cfs_spin_lock_init(&export->exp_bl_list_lock);
+        CFS_INIT_LIST_HEAD(&export->exp_bl_list);
 
         export->exp_sp_peer = LUSTRE_SP_ANY;
         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
index ee41fe4..52769e0 100644 (file)
@@ -190,7 +190,7 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
 
         /* Do the destroy and set the reply status accordingly  */
-        req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
+        req->rq_status = obd_destroy(exp, &repbody->oa, NULL, oti, NULL, capa);
         RETURN(0);
 }
 
@@ -264,40 +264,40 @@ static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        rc = ost_lock_get(exp, &body->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
-        if (rc)
-                RETURN(rc);
-
         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
                 if (capa == NULL) {
                         CERROR("Missing capability for OST GETATTR");
-                        GOTO(unlock, rc = -EFAULT);
+                        RETURN(-EFAULT);
                 }
         }
 
+        rc = req_capsule_server_pack(&req->rq_pill);
+        if (rc)
+                RETURN(rc);
+
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        repbody->oa = body->oa;
+
+        rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
+        if (rc)
+                RETURN(rc);
+
         OBD_ALLOC_PTR(oinfo);
         if (!oinfo)
                 GOTO(unlock, rc = -ENOMEM);
-        oinfo->oi_oa = &body->oa;
+        oinfo->oi_oa = &repbody->oa;
         oinfo->oi_capa = capa;
 
         req->rq_status = obd_getattr(exp, oinfo);
 
         OBD_FREE_PTR(oinfo);
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
         ost_drop_id(exp, &repbody->oa);
 
 unlock:
         ost_lock_put(exp, &lh, LCK_PR);
-
-        RETURN(0);
+        RETURN(rc);
 }
 
 static int ost_statfs(struct ptlrpc_request *req)
@@ -381,22 +381,25 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
         if (body->oa.o_size == 0)
                 flags |= LDLM_AST_DISCARD_DATA;
 
-        rc = ost_lock_get(exp, &body->oa, body->oa.o_size, body->oa.o_blocks,
-                          &lh, LCK_PW, flags);
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        repbody->oa = body->oa;
+
+        rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
+                          repbody->oa.o_blocks, &lh, LCK_PW, flags);
         if (rc == 0) {
                 struct obd_info *oinfo;
                 struct lustre_capa *capa = NULL;
 
-                if (body->oa.o_valid & OBD_MD_FLFLAGS &&
-                    body->oa.o_flags == OBD_FL_SRVLOCK)
+                if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
+                    repbody->oa.o_flags == OBD_FL_SRVLOCK)
                         /*
                          * If OBD_FL_SRVLOCK is the only bit set in
                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
                          * through filter_setattr() to filter_iocontrol().
                          */
-                        body->oa.o_valid &= ~OBD_MD_FLFLAGS;
+                        repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
 
-                if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+                if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
                         capa = req_capsule_client_get(&req->rq_pill,
                                                       &RMF_CAPA1);
                         if (capa == NULL) {
@@ -408,7 +411,7 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
                 OBD_ALLOC_PTR(oinfo);
                 if (!oinfo)
                         GOTO(unlock, rc = -ENOMEM);
-                oinfo->oi_oa = &body->oa;
+                oinfo->oi_oa = &repbody->oa;
                 oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
                 oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
                 oinfo->oi_capa = capa;
@@ -420,8 +423,6 @@ unlock:
                 ost_lock_put(exp, &lh, LCK_PW);
         }
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
         ost_drop_id(exp, &repbody->oa);
         RETURN(rc);
 }
@@ -454,18 +455,19 @@ static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        repbody->oa = body->oa;
+
         OBD_ALLOC_PTR(oinfo);
         if (!oinfo)
                 RETURN(-ENOMEM);
 
-        oinfo->oi_oa = &body->oa;
+        oinfo->oi_oa = &repbody->oa;
         oinfo->oi_capa = capa;
-        req->rq_status = obd_sync(exp, oinfo, body->oa.o_size,
-                                  body->oa.o_blocks, NULL);
+        req->rq_status = obd_sync(exp, oinfo, repbody->oa.o_size,
+                                  repbody->oa.o_blocks, NULL);
         OBD_FREE_PTR(oinfo);
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
@@ -499,18 +501,19 @@ static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
                 }
         }
 
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        repbody->oa = body->oa;
+
         OBD_ALLOC_PTR(oinfo);
         if (!oinfo)
                 RETURN(-ENOMEM);
-        oinfo->oi_oa = &body->oa;
+        oinfo->oi_oa = &repbody->oa;
         oinfo->oi_capa = capa;
 
         req->rq_status = obd_setattr(exp, oinfo, oti);
 
         OBD_FREE_PTR(oinfo);
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
@@ -594,124 +597,6 @@ static void ost_brw_lock_put(int mode,
         EXIT;
 }
 
-struct ost_prolong_data {
-        struct obd_export *opd_exp;
-        ldlm_policy_data_t opd_policy;
-        struct obdo *opd_oa;
-        ldlm_mode_t opd_mode;
-        int opd_lock_match;
-        int opd_timeout;
-};
-
-static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
-{
-        struct ost_prolong_data *opd = data;
-
-        LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
-
-        if (lock->l_req_mode != lock->l_granted_mode) {
-                /* scan granted locks only */
-                return LDLM_ITER_STOP;
-        }
-
-        if (lock->l_export != opd->opd_exp) {
-                /* prolong locks only for given client */
-                return LDLM_ITER_CONTINUE;
-        }
-
-        if (!(lock->l_granted_mode & opd->opd_mode)) {
-                /* we aren't interesting in all type of locks */
-                return LDLM_ITER_CONTINUE;
-        }
-
-        if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
-            lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
-                /* the request doesn't cross the lock, skip it */
-                return LDLM_ITER_CONTINUE;
-        }
-
-        /* Fill the obdo with the matched lock handle.
-         * XXX: it is possible in some cases the IO RPC is covered by several
-         * locks, even for the write case, so it may need to be a lock list. */
-        if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
-                opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
-                opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
-        }
-
-        if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
-                /* ignore locks not being cancelled */
-                return LDLM_ITER_CONTINUE;
-        }
-
-        CDEBUG(D_DLMTRACE,"refresh lock: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
-               lock->l_resource->lr_name.name[0],
-               lock->l_resource->lr_name.name[1],
-               opd->opd_policy.l_extent.start, opd->opd_policy.l_extent.end);
-        /* OK. this is a possible lock the user holds doing I/O
-         * let's refresh eviction timer for it */
-        ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
-        opd->opd_lock_match = 1;
-
-        return LDLM_ITER_CONTINUE;
-}
-
-static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
-                                struct niobuf_remote *nb, struct obdo *oa,
-                                ldlm_mode_t mode)
-{
-        struct ldlm_res_id res_id;
-        int nrbufs = obj->ioo_bufcnt;
-        struct ost_prolong_data opd = { 0 };
-        ENTRY;
-
-        osc_build_res_name(obj->ioo_id, obj->ioo_seq, &res_id);
-
-        opd.opd_mode = mode;
-        opd.opd_exp = req->rq_export;
-        opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
-        opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
-                                       nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
-
-        /* prolong locks for the current service time of the corresponding
-         * portal (= OST_IO_PORTAL) */
-        opd.opd_timeout = AT_OFF ? obd_timeout / 2:
-                          max(at_est2timeout(at_get(&req->rq_rqbd->
-                              rqbd_service->srv_at_estimate)), ldlm_timeout);
-
-        CDEBUG(D_INFO,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
-               res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
-               opd.opd_policy.l_extent.end);
-
-        if (oa->o_valid & OBD_MD_FLHANDLE) {
-                struct ldlm_lock *lock;
-
-                lock = ldlm_handle2lock(&oa->o_handle);
-                if (lock != NULL) {
-                        ost_prolong_locks_iter(lock, &opd);
-                        if (opd.opd_lock_match) {
-                                LDLM_LOCK_PUT(lock);
-                                RETURN(1);
-                        }
-
-                        /* Check if the lock covers the whole IO region,
-                         * otherwise iterate through the resource. */
-                        if (lock->l_policy_data.l_extent.end >=
-                            opd.opd_policy.l_extent.end &&
-                            lock->l_policy_data.l_extent.start <=
-                            opd.opd_policy.l_extent.start) {
-                                LDLM_LOCK_PUT(lock);
-                                RETURN(0);
-                        }
-                        LDLM_LOCK_PUT(lock);
-                }
-        }
-
-        opd.opd_oa = oa;
-        ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
-                              ost_prolong_locks_iter, &opd);
-        RETURN(opd.opd_lock_match);
-}
-
 /* Allocate thread local buffers if needed */
 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
 {
@@ -843,8 +728,11 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
+
         npages = OST_THREAD_POOL_SIZE;
-        rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
+        rc = obd_preprw(OBD_BRW_READ, exp, &repbody->oa, 1, ioo,
                         remote_nb, &npages, local_nb, oti, capa);
         if (rc != 0)
                 GOTO(out_lock, rc);
@@ -854,12 +742,6 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (desc == NULL)
                 GOTO(out_commitrw, rc = -ENOMEM);
 
-        if (!lustre_handle_is_used(&lockh))
-                /* no needs to try to prolong lock if server is asked
-                 * to handle locking (= OBD_BRW_SRVLOCK) */
-                ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa,
-                                     LCK_PW | LCK_PR);
-
         nob = 0;
         for (i = 0; i < npages; i++) {
                 int page_rc = local_nb[i].rc;
@@ -887,14 +769,15 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
                 cksum_type_t cksum_type =
-                        cksum_type_unpack(body->oa.o_valid & OBD_MD_FLFLAGS ?
-                                          body->oa.o_flags : 0);
-                body->oa.o_flags = cksum_type_pack(cksum_type);
-                body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
-                CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
+                        cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
+                                          repbody->oa.o_flags : 0);
+                repbody->oa.o_flags = cksum_type_pack(cksum_type);
+                repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+                repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
+                CDEBUG(D_PAGE, "checksum at read origin: %x\n",
+                       repbody->oa.o_cksum);
         } else {
-                body->oa.o_valid = 0;
+                repbody->oa.o_valid = 0;
         }
         /* We're finishing using body->oa as an input variable */
 
@@ -907,14 +790,11 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
 out_commitrw:
         /* Must commit after prep above in all cases */
-        rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
+        rc = obd_commitrw(OBD_BRW_READ, exp, &repbody->oa, 1, ioo,
                           remote_nb, npages, local_nb, oti, rc);
 
-        if (rc == 0) {
-                repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-                memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
+        if (rc == 0)
                 ost_drop_id(exp, &repbody->oa);
-        }
 
 out_lock:
         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
@@ -1053,11 +933,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
-        if (!lustre_handle_is_used(&lockh))
-                /* no needs to try to prolong lock if server is asked
-                 * to handle locking (= OBD_BRW_SRVLOCK) */
-                ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
-
         /* obd_preprw clobbers oa->valid, so save what we need */
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
                 client_cksum = body->oa.o_cksum;
@@ -1079,8 +954,12 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 o_uid = body->oa.o_uid;
                 o_gid = body->oa.o_gid;
         }
+
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
+
         npages = OST_THREAD_POOL_SIZE;
-        rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
+        rc = obd_preprw(OBD_BRW_WRITE, exp, &repbody->oa, objcount,
                         ioo, remote_nb, &npages, local_nb, oti, capa);
         if (rc != 0)
                 GOTO(out_lock, rc);
@@ -1105,9 +984,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         no_reply = rc != 0;
 
 skip_transfer:
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
-
         if (unlikely(client_cksum != 0 && rc == 0)) {
                 static int cksum_counter;
                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
@@ -1769,6 +1645,101 @@ int ost_msg_check_version(struct lustre_msg *msg)
         return rc;
 }
 
+struct ost_prolong_data {
+        struct ptlrpc_request *opd_req;
+        struct obd_export     *opd_exp;
+        struct obdo           *opd_oa;
+        struct ldlm_res_id     opd_resid;
+        struct ldlm_extent     opd_extent;
+        ldlm_mode_t            opd_mode;
+        unsigned int           opd_locks;
+        int                    opd_timeout;
+};
+
+/* prolong locks for the current service time of the corresponding
+ * portal (= OST_IO_PORTAL)
+ */
+static inline int prolong_timeout(struct ptlrpc_request *req)
+{
+        struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+
+        if (AT_OFF)
+                return obd_timeout / 2;
+
+        return max(at_est2timeout(at_get(&svc->srv_at_estimate)), ldlm_timeout);
+}
+
+static void ost_prolong_lock_one(struct ost_prolong_data *opd,
+                                 struct ldlm_lock *lock)
+{
+        LASSERT(lock->l_req_mode == lock->l_granted_mode);
+        LASSERT(lock->l_export == opd->opd_exp);
+
+        /* XXX: never try to grab resource lock here because we're inside
+         * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
+         * res lock and then exp_bl_list_lock. */
+
+        if (!(lock->l_flags & LDLM_FL_AST_SENT))
+                /* ignore locks not being cancelled */
+                return;
+
+        LDLM_DEBUG(lock,
+                   "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
+                   opd->opd_req->rq_xid, opd->opd_extent.start,
+                   opd->opd_extent.end, opd->opd_timeout);
+
+        /* OK. this is a possible lock the user holds doing I/O
+         * let's refresh eviction timer for it */
+        ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
+        ++opd->opd_locks;
+}
+
+static void ost_prolong_locks(struct ost_prolong_data *data)
+{
+        struct obd_export *exp = data->opd_exp;
+        struct obdo       *oa  = data->opd_oa;
+        struct ldlm_lock  *lock;
+        ENTRY;
+
+        if (oa->o_valid & OBD_MD_FLHANDLE) {
+                /* mostly a request should be covered by only one lock, try
+                 * fast path. */
+                lock = ldlm_handle2lock(&oa->o_handle);
+                if (lock != NULL) {
+                        /* Fast path to check if the lock covers the whole IO
+                         * region exclusively. */
+                        if (lock->l_granted_mode == LCK_PW &&
+                            ldlm_extent_contain(&lock->l_policy_data.l_extent,
+                                                &data->opd_extent)) {
+                                /* bingo */
+                                ost_prolong_lock_one(data, lock);
+                                LDLM_LOCK_PUT(lock);
+                                RETURN_EXIT;
+                        }
+                        LDLM_LOCK_PUT(lock);
+                }
+        }
+
+
+        cfs_spin_lock_bh(&exp->exp_bl_list_lock);
+        cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
+                LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+                LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
+
+                if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
+                        continue;
+
+                if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
+                                         &data->opd_extent))
+                        continue;
+
+                ost_prolong_lock_one(data, lock);
+        }
+        cfs_spin_unlock_bh(&exp->exp_bl_list_lock);
+
+        EXIT;
+}
+
 /**
  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
  * not.
@@ -1778,61 +1749,35 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
 {
         struct niobuf_remote *nb;
         struct obd_ioobj *ioo;
-        struct ost_body *body;
-        int objcount, niocount;
-        int mode, opc, i, rc;
-        __u64 start, end;
+        int mode, opc;
+        struct ldlm_extent ext;
         ENTRY;
 
         opc = lustre_msg_get_opc(req->rq_reqmsg);
         LASSERT(opc == OST_READ || opc == OST_WRITE);
 
-        /* As the request may be covered by several locks, do not look at
-         * o_handle, look at the RPC IO region. */
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(0);
-
-        objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
-                                        RCL_CLIENT) / sizeof(*ioo);
         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
-        if (ioo == NULL)
-                RETURN(0);
-
-        rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
-        if (rc)
-                RETURN(rc);
-
-        for (niocount = i = 0; i < objcount; i++)
-                niocount += ioo[i].ioo_bufcnt;
+        LASSERT(ioo != NULL);
 
         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
-        if (nb == NULL ||
-            niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
-            RCL_CLIENT) / sizeof(*nb)))
-                RETURN(0);
-
-        mode = LCK_PW;
-        if (opc == OST_READ)
-                mode |= LCK_PR;
+        LASSERT(nb != NULL);
 
-        start = nb[0].offset & CFS_PAGE_MASK;
-        end = (nb[ioo->ioo_bufcnt - 1].offset +
-               nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
+        ext.start = nb->offset;
+        nb += ioo->ioo_bufcnt - 1;
+        ext.end = nb->offset + nb->len - 1;
 
         LASSERT(lock->l_resource != NULL);
         if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq,
                              &lock->l_resource->lr_name))
                 RETURN(0);
 
+        mode = LCK_PW;
+        if (opc == OST_READ)
+                mode |= LCK_PR;
         if (!(lock->l_granted_mode & mode))
                 RETURN(0);
 
-        if (lock->l_policy_data.l_extent.end < start ||
-            lock->l_policy_data.l_extent.start > end)
-                RETURN(0);
-
-        RETURN(1);
+        RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
 }
 
 /**
@@ -1847,78 +1792,62 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
  */
 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
 {
-        struct niobuf_remote *nb;
-        struct obd_ioobj *ioo;
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct ost_body *body;
-        int objcount, niocount;
-        int mode, opc, i, rc;
+        struct obd_ioobj *ioo;
+        struct niobuf_remote *nb;
+        struct ost_prolong_data opd = { 0 };
+        int mode, opc;
         ENTRY;
 
+        /*
+         * Use LASSERT to do sanity check because malformed RPCs should have
+         * been filtered out in ost_hpreq_handler().
+         */
         opc = lustre_msg_get_opc(req->rq_reqmsg);
         LASSERT(opc == OST_READ || opc == OST_WRITE);
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
+        LASSERT(body != NULL);
 
-        objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
-                                        RCL_CLIENT) / sizeof(*ioo);
         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
-        if (ioo == NULL)
-                RETURN(-EFAULT);
-
-        rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
-        if (rc)
-                RETURN(rc);
+        LASSERT(ioo != NULL);
 
-        for (niocount = i = 0; i < objcount; i++)
-                niocount += ioo[i].ioo_bufcnt;
         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
-        if (nb == NULL ||
-            niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
-            RCL_CLIENT) / sizeof(*nb)))
-                RETURN(-EFAULT);
-        if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK))
-                RETURN(-EFAULT);
+        LASSERT(nb != NULL);
+        LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
 
+        osc_build_res_name(ioo->ioo_id, ioo->ioo_seq, &opd.opd_resid);
+
+        opd.opd_req = req;
         mode = LCK_PW;
         if (opc == OST_READ)
                 mode |= LCK_PR;
-        RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
-}
+        opd.opd_mode = mode;
+        opd.opd_exp = req->rq_export;
+        opd.opd_oa  = &body->oa;
+        opd.opd_extent.start = nb->offset;
+        nb += ioo->ioo_bufcnt - 1;
+        opd.opd_extent.end = nb->offset + nb->len - 1;
+        opd.opd_timeout = prolong_timeout(req);
 
-static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
-{
-        struct ldlm_res_id res_id = { .name = { oa->o_id } };
-        struct ost_prolong_data opd = { 0 };
-        __u64 start, end;
-        ENTRY;
+        DEBUG_REQ(D_RPCTRACE, req,
+               "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+               obd->obd_name, cfs_current()->comm,
+               opd.opd_resid.name[0], opd.opd_resid.name[1],
+               opd.opd_extent.start, opd.opd_extent.end);
 
-        start = oa->o_size;
-        end = start + oa->o_blocks;
+        ost_prolong_locks(&opd);
 
-        opd.opd_mode = LCK_PW;
-        opd.opd_exp = req->rq_export;
-        opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
-        if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
-                opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
-        else
-                opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
-
-        /* prolong locks for the current service time of the corresponding
-         * portal (= OST_IO_PORTAL) */
-        opd.opd_timeout = AT_OFF ? obd_timeout / 2:
-                          max(at_est2timeout(at_get(&req->rq_rqbd->
-                              rqbd_service->srv_at_estimate)), ldlm_timeout);
-
-        CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
-               res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
-               opd.opd_policy.l_extent.end);
-
-        opd.opd_oa = oa;
-        ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
-                              ost_prolong_locks_iter, &opd);
-        RETURN(opd.opd_lock_match);
+        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
+               obd->obd_name, opd.opd_locks, req);
+
+        RETURN(opd.opd_locks);
+}
+
+static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
+{
+        (void)ost_rw_hpreq_check(req);
 }
 
 /**
@@ -1928,20 +1857,15 @@ static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
                                       struct ldlm_lock *lock)
 {
         struct ost_body *body;
-        int rc;
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(0);  /* can't return -EFAULT here */
-
-        rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
-        if (rc)
-                RETURN(rc);
+        LASSERT(body != NULL);
 
         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
                 RETURN(1);
+
         RETURN(0);
 }
 
@@ -1950,31 +1874,64 @@ static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
  */
 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
 {
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct ost_body *body;
-        int rc;
+        struct obdo *oa;
+        struct ost_prolong_data opd = { 0 };
+        __u64 start, end;
+        ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
+        LASSERT(body != NULL);
 
-        rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
-        if (rc)
-                RETURN(rc);
+        oa = &body->oa;
+        LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
+                !(oa->o_flags & OBD_FL_SRVLOCK));
 
-        LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
-                !(body->oa.o_flags & OBD_FL_SRVLOCK));
+        start = oa->o_size;
+        end = start + oa->o_blocks;
+
+        opd.opd_req = req;
+        opd.opd_mode = LCK_PW;
+        opd.opd_exp = req->rq_export;
+        opd.opd_oa  = oa;
+        opd.opd_extent.start = start;
+        opd.opd_extent.end   = end;
+        if (oa->o_blocks == OBD_OBJECT_EOF)
+                opd.opd_extent.end = OBD_OBJECT_EOF;
+        opd.opd_timeout = prolong_timeout(req);
+
+        osc_build_res_name(oa->o_id, oa->o_seq, &opd.opd_resid);
+
+        CDEBUG(D_DLMTRACE,
+               "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+               obd->obd_name,
+               opd.opd_resid.name[0], opd.opd_resid.name[1],
+               opd.opd_extent.start, opd.opd_extent.end);
+
+        ost_prolong_locks(&opd);
 
-        RETURN(ost_punch_prolong_locks(req, &body->oa));
+        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
+               obd->obd_name, opd.opd_locks, req);
+
+        RETURN(opd.opd_locks > 0);
+}
+
+static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
+{
+        (void)ost_punch_hpreq_check(req);
 }
 
 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
-        .hpreq_lock_match  = ost_rw_hpreq_lock_match,
-        .hpreq_check       = ost_rw_hpreq_check,
+        .hpreq_lock_match = ost_rw_hpreq_lock_match,
+        .hpreq_check      = ost_rw_hpreq_check,
+        .hpreq_fini       = ost_rw_hpreq_fini
 };
 
 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
-        .hpreq_lock_match  = ost_punch_hpreq_lock_match,
-        .hpreq_check       = ost_punch_hpreq_check,
+        .hpreq_lock_match = ost_punch_hpreq_lock_match,
+        .hpreq_check      = ost_punch_hpreq_check,
+        .hpreq_fini       = ost_punch_hpreq_fini
 };
 
 /** Assign high priority operations to the request if needed. */
@@ -1989,6 +1946,7 @@ static int ost_hpreq_handler(struct ptlrpc_request *req)
                         struct niobuf_remote *nb;
                         struct obd_ioobj *ioo;
                         int objcount, niocount;
+                        int rc;
                         int i;
 
                         /* RPCs on the H-P queue can be inspected before
@@ -2032,6 +1990,12 @@ static int ost_hpreq_handler(struct ptlrpc_request *req)
                                 RETURN(-EFAULT);
                         }
 
+                        rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
+                        if (rc) {
+                                CERROR("invalid object ids\n");
+                                RETURN(rc);
+                        }
+
                         for (niocount = i = 0; i < objcount; i++) {
                                 if (ioo[i].ioo_bufcnt == 0) {
                                         CERROR("ioo[%d] has zero bufcnt\n", i);
index dd702eb..66ff9fc 100644 (file)
@@ -67,6 +67,7 @@ CFS_MODULE_PARM(at_extra, "i", int, 0644,
 
 /* forward ref */
 static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
+static void ptlrpc_hpreq_fini(struct ptlrpc_request *req);
 
 static CFS_LIST_HEAD(ptlrpc_all_services);
 cfs_spinlock_t ptlrpc_all_services_lock;
@@ -735,6 +736,8 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 static void ptlrpc_server_finish_request(struct ptlrpc_service *svc,
                                          struct ptlrpc_request *req)
 {
+        ptlrpc_hpreq_fini(req);
+
         cfs_spin_lock(&svc->srv_rq_lock);
         svc->srv_n_active_reqs--;
         if (req->rq_hp)
@@ -1209,7 +1212,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
 static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
                              struct ptlrpc_request *req)
 {
-        int rc;
+        int rc = 0;
         ENTRY;
 
         if (svc->srv_hpreq_handler) {
@@ -1218,13 +1221,19 @@ static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
                         RETURN(rc);
         }
         if (req->rq_export && req->rq_ops) {
+                /* Perform request specific check. We should do this check
+                 * before the request is added into exp_hp_rpcs list otherwise
+                 * it may hit swab race at LU-1044. */
+                if (req->rq_ops->hpreq_check)
+                        rc = req->rq_ops->hpreq_check(req);
+
                 cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
                 cfs_list_add(&req->rq_exp_list,
-                             &req->rq_export->exp_queued_rpc);
+                             &req->rq_export->exp_hp_rpcs);
                 cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
         }
 
-        RETURN(0);
+        RETURN(rc);
 }
 
 /** Remove the request from the export list. */
@@ -1232,6 +1241,11 @@ static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
 {
         ENTRY;
         if (req->rq_export && req->rq_ops) {
+                /* refresh lock timeout again so that client has more
+                 * room to send lock cancel RPC. */
+                if (req->rq_ops->hpreq_fini)
+                        req->rq_ops->hpreq_fini(req);
+
                 cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
                 cfs_list_del_init(&req->rq_exp_list);
                 cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
@@ -1262,7 +1276,7 @@ static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
                 cfs_list_move_tail(&req->rq_list, &svc->srv_request_hpq);
                 req->rq_hp = 1;
                 if (opc != OBD_PING)
-                        DEBUG_REQ(D_NET, req, "high priority req");
+                        DEBUG_REQ(D_RPCTRACE, req, "high priority req");
         }
         cfs_spin_unlock(&req->rq_lock);
         EXIT;
@@ -1288,20 +1302,16 @@ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
 }
 
 /** Check if the request is a high priority one. */
-static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
+static int ptlrpc_server_hpreq_check(struct ptlrpc_service *svc,
+                                     struct ptlrpc_request *req)
 {
-        int opc, rc = 0;
         ENTRY;
 
         /* Check by request opc. */
-        opc = lustre_msg_get_opc(req->rq_reqmsg);
-        if (opc == OBD_PING)
+        if (OBD_PING == lustre_msg_get_opc(req->rq_reqmsg))
                 RETURN(1);
 
-        /* Perform request specific check. */
-        if (req->rq_ops && req->rq_ops->hpreq_check)
-                rc = req->rq_ops->hpreq_check(req);
-        RETURN(rc);
+        RETURN(ptlrpc_hpreq_init(svc, req));
 }
 
 /** Check if a request is a high priority one. */
@@ -1311,7 +1321,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
         int rc;
         ENTRY;
 
-        rc = ptlrpc_server_hpreq_check(req);
+        rc = ptlrpc_server_hpreq_check(svc, req);
         if (rc < 0)
                 RETURN(rc);
 
@@ -1518,7 +1528,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                 break;
         }
 
-        CDEBUG(D_NET, "got req "LPU64"\n", req->rq_xid);
+        CDEBUG(D_RPCTRACE, "got req x"LPU64"\n", req->rq_xid);
 
         req->rq_export = class_conn2export(
                 lustre_msg_get_handle(req->rq_reqmsg));
@@ -1554,9 +1564,6 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         }
 
         ptlrpc_at_add_timed(req);
-        rc = ptlrpc_hpreq_init(svc, req);
-        if (rc)
-                GOTO(err_req, rc);
 
         /* Move it over to the request processing queue */
         rc = ptlrpc_server_request_add(svc, req);
@@ -1635,7 +1642,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         cfs_spin_unlock(&svc->srv_rq_lock);
 
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-        ptlrpc_hpreq_fini(request);
 
         if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
                 libcfs_debug_dumplog();
@@ -2686,7 +2692,6 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 req = ptlrpc_server_request_get(service, 1);
                 cfs_list_del(&req->rq_list);
                 service->srv_n_active_reqs++;
-                ptlrpc_hpreq_fini(req);
                 ptlrpc_server_finish_request(service, req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);