From 15bd23c47332e476b985b220170fd47814ab7f14 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Wed, 25 Jan 2012 11:27:55 -0800 Subject: [PATCH] LU-874 ptlrpc: handle in-flight hqreq correctly If there are in-flight requests pending, we shouldn't timeout the covering dlm locks; neither should we remove the requests from export exp_hp_rpcs list until the requests are handled. In this patch, the following things are improved: 1. leave IO rpcs in export's hp list until they are handled; 2. using interval tree to find rpc overlapped locks; 3. refresh the lock again after IO rpcs are finished to leave a time window for clients to cancel covering dlm locks; 4. rework repbody in ost_handler.c so as to not modify original obdo 5. cleanup the code. Signed-off-by: Jinshan Xiong Change-Id: I33e2d113d7929a56389741c06dffb5efb6bf28a3 Reviewed-on: http://review.whamcloud.com/1918 Tested-by: Hudson Reviewed-by: Andreas Dilger Reviewed-by: Johann Lombardi Tested-by: Maloo Reviewed-by: --- lustre/include/lustre/lustre_idl.h | 13 + lustre/include/lustre_dlm.h | 5 + lustre/include/lustre_export.h | 11 +- lustre/include/lustre_net.h | 4 + lustre/ldlm/ldlm_extent.c | 9 +- lustre/ldlm/ldlm_internal.h | 6 - lustre/ldlm/ldlm_lock.c | 1 + lustre/ldlm/ldlm_lockd.c | 24 +- lustre/ldlm/ldlm_resource.c | 1 - lustre/obdclass/genops.c | 6 +- lustre/ost/ost_handler.c | 542 +++++++++++++++++-------------------- lustre/ptlrpc/service.c | 43 +-- 12 files changed, 337 insertions(+), 328 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index a9e5968..bcef3b3 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -2204,6 +2204,12 @@ struct ldlm_res_id { extern void lustre_swab_ldlm_res_id (struct ldlm_res_id *id); +static inline int ldlm_res_eq(const struct ldlm_res_id *res0, + const struct ldlm_res_id *res1) +{ + return !memcmp(res0, res1, sizeof(*res0)); +} + /* lock types */ typedef enum { LCK_MINMODE = 0, @@ -2242,6 +2248,13 @@ static inline int ldlm_extent_overlap(struct ldlm_extent *ex1, return (ex1->start <= ex2->end) && (ex2->start <= ex1->end); } +/* check if @ex1 contains @ex2 */ +static inline int ldlm_extent_contain(struct ldlm_extent *ex1, + struct ldlm_extent *ex2) +{ + return (ex1->start <= ex2->start) && (ex1->end >= ex2->end); +} + struct ldlm_inodebits { __u64 bits; }; diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 1d4a2a7..3f6af4f 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -795,6 +795,11 @@ struct ldlm_lock { /** referenced export object */ struct obd_export *l_exp_refs_target; #endif + /** export blocking dlm lock list, protected by + * l_export->exp_bl_list_lock. + * Lock order of waiting_lists_spinlock, exp_bl_list_lock and res lock + * is: res lock -> exp_bl_list_lock -> wanting_lists_spinlock. */ + cfs_list_t l_exp_list; }; struct ldlm_resource { diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 2e0d95e..b8794ab 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -224,8 +224,6 @@ struct obd_export { cfs_list_t exp_req_replay_queue; /** protects exp_flags and exp_outstanding_replies */ cfs_spinlock_t exp_lock; - /** protects exp_queued_rpc */ - cfs_spinlock_t exp_rpc_lock; /** Compatibility flags for this export */ __u64 exp_connect_flags; enum obd_option exp_flags; @@ -246,13 +244,20 @@ struct obd_export { /* client timed out and tried to reconnect, * but couldn't because of active rpcs */ exp_abort_active_req:1; - cfs_list_t exp_queued_rpc; /* RPC to be handled */ /* also protected by exp_lock */ enum lustre_sec_part exp_sp_peer; struct sptlrpc_flavor exp_flvr; /* current */ struct sptlrpc_flavor exp_flvr_old[2]; /* about-to-expire */ cfs_time_t exp_flvr_expire[2]; /* seconds */ + /** protects exp_hp_rpcs */ + cfs_spinlock_t exp_rpc_lock; + cfs_list_t exp_hp_rpcs; /* (potential) HP RPCs */ + + /** blocking dlm lock list, protected by exp_bl_list_lock */ + cfs_list_t exp_bl_list; + cfs_spinlock_t exp_bl_list_lock; + /** Target specific data */ union { struct tg_export_data eu_target_data; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 67ec415..9f520dc 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -448,6 +448,10 @@ struct ptlrpc_hpreq_ops { * Check if the request is a high priority one. */ int (*hpreq_check)(struct ptlrpc_request *); + /** + * Called after the request has been handled. + */ + void (*hpreq_fini)(struct ptlrpc_request *); }; /** diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index fe730f5..9117142 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -81,11 +81,12 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req, } /* we need to ensure that the lock extent is properly aligned to what - * the client requested. We align it to the lowest-common denominator - * of the clients requested lock start and end alignment. */ - mask = 0x1000ULL; + * the client requested. Also we need to make sure it's also server + * page size aligned otherwise a server page can be covered by two + * write locks. */ + mask = CFS_PAGE_SIZE; req_align = (req_end + 1) | req_start; - if (req_align != 0) { + if (req_align != 0 && (req_align & (mask - 1)) == 0) { while ((req_align & mask) == 0) mask <<= 1; } diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index aa0faef..04e98de 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -46,12 +46,6 @@ extern cfs_list_t ldlm_srv_namespace_list; extern cfs_semaphore_t ldlm_cli_namespace_lock; extern cfs_list_t ldlm_cli_namespace_list; -static inline int ldlm_res_eq(const struct ldlm_res_id *res0, - const struct ldlm_res_id *res1) -{ - return !memcmp(res0, res1, sizeof(*res0)); -} - static inline cfs_atomic_t *ldlm_namespace_nr(ldlm_side_t client) { return client == LDLM_NAMESPACE_SERVER ? diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 8e34904..f72248a 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -437,6 +437,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) lock->l_exp_refs_nr = 0; lock->l_exp_refs_target = NULL; #endif + CFS_INIT_LIST_HEAD(&lock->l_exp_list); RETURN(lock); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 5de675d..dfac3e8 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -264,7 +264,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) return 0; cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); - cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, + cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { if (req->rq_ops->hpreq_lock_match) { match = req->rq_ops->hpreq_lock_match(req, lock); @@ -444,12 +444,21 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) } ret = __ldlm_add_waiting_lock(lock, timeout); - if (ret) + if (ret) { /* grab ref on the lock if it has been added to the * waiting list */ LDLM_LOCK_GET(lock); + } cfs_spin_unlock_bh(&waiting_locks_spinlock); + if (ret) { + cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock); + if (cfs_list_empty(&lock->l_exp_list)) + cfs_list_add(&lock->l_exp_list, + &lock->l_export->exp_bl_list); + cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + } + LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)", ret == 0 ? "not re-" : "", timeout, AT_OFF ? "off" : "on"); @@ -504,10 +513,17 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) cfs_spin_lock_bh(&waiting_locks_spinlock); ret = __ldlm_del_waiting_lock(lock); cfs_spin_unlock_bh(&waiting_locks_spinlock); - if (ret) + + /* remove the lock out of export blocking list */ + cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock); + cfs_list_del_init(&lock->l_exp_list); + cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + + if (ret) { /* release lock ref if it has indeed been removed * from a list */ LDLM_LOCK_RELEASE(lock); + } LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed"); return ret; @@ -709,7 +725,7 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) } cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); - cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, + cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { /* Do not process requests that were not yet added to there * incoming queue or were already removed from there for diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 4d6dab4..e67e39f 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -1196,7 +1196,6 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, cfs_list_t *head, { check_res_locked(res); - ldlm_resource_dump(D_INFO, res); CDEBUG(D_OTHER, "About to add this lock:\n"); ldlm_lock_dump(D_OTHER, lock, 0); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 6921762..0dfaff3 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -752,7 +752,7 @@ static void class_export_destroy(struct obd_export *exp) LASSERT(cfs_list_empty(&exp->exp_outstanding_replies)); LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies)); LASSERT(cfs_list_empty(&exp->exp_req_replay_queue)); - LASSERT(cfs_list_empty(&exp->exp_queued_rpc)); + LASSERT(cfs_list_empty(&exp->exp_hp_rpcs)); obd_destroy_export(exp); class_decref(obd, "export", exp); @@ -826,13 +826,15 @@ struct obd_export *class_new_export(struct obd_device *obd, CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies); CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue); CFS_INIT_LIST_HEAD(&export->exp_handle.h_link); - CFS_INIT_LIST_HEAD(&export->exp_queued_rpc); + CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs); class_handle_hash(&export->exp_handle, export_handle_addref); export->exp_last_request_time = cfs_time_current_sec(); cfs_spin_lock_init(&export->exp_lock); cfs_spin_lock_init(&export->exp_rpc_lock); CFS_INIT_HLIST_NODE(&export->exp_uuid_hash); CFS_INIT_HLIST_NODE(&export->exp_nid_hash); + cfs_spin_lock_init(&export->exp_bl_list_lock); + CFS_INIT_LIST_HEAD(&export->exp_bl_list); export->exp_sp_peer = LUSTRE_SP_ANY; export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index ee41fe4..52769e0 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -190,7 +190,7 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); /* Do the destroy and set the reply status accordingly */ - req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa); + req->rq_status = obd_destroy(exp, &repbody->oa, NULL, oti, NULL, capa); RETURN(0); } @@ -264,40 +264,40 @@ static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req) if (rc) RETURN(rc); - rc = req_capsule_server_pack(&req->rq_pill); - if (rc) - RETURN(rc); - - rc = ost_lock_get(exp, &body->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0); - if (rc) - RETURN(rc); - if (body->oa.o_valid & OBD_MD_FLOSSCAPA) { capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); if (capa == NULL) { CERROR("Missing capability for OST GETATTR"); - GOTO(unlock, rc = -EFAULT); + RETURN(-EFAULT); } } + rc = req_capsule_server_pack(&req->rq_pill); + if (rc) + RETURN(rc); + + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + repbody->oa = body->oa; + + rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0); + if (rc) + RETURN(rc); + OBD_ALLOC_PTR(oinfo); if (!oinfo) GOTO(unlock, rc = -ENOMEM); - oinfo->oi_oa = &body->oa; + oinfo->oi_oa = &repbody->oa; oinfo->oi_capa = capa; req->rq_status = obd_getattr(exp, oinfo); OBD_FREE_PTR(oinfo); - repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - repbody->oa = body->oa; ost_drop_id(exp, &repbody->oa); unlock: ost_lock_put(exp, &lh, LCK_PR); - - RETURN(0); + RETURN(rc); } static int ost_statfs(struct ptlrpc_request *req) @@ -381,22 +381,25 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, if (body->oa.o_size == 0) flags |= LDLM_AST_DISCARD_DATA; - rc = ost_lock_get(exp, &body->oa, body->oa.o_size, body->oa.o_blocks, - &lh, LCK_PW, flags); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + repbody->oa = body->oa; + + rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size, + repbody->oa.o_blocks, &lh, LCK_PW, flags); if (rc == 0) { struct obd_info *oinfo; struct lustre_capa *capa = NULL; - if (body->oa.o_valid & OBD_MD_FLFLAGS && - body->oa.o_flags == OBD_FL_SRVLOCK) + if (repbody->oa.o_valid & OBD_MD_FLFLAGS && + repbody->oa.o_flags == OBD_FL_SRVLOCK) /* * If OBD_FL_SRVLOCK is the only bit set in * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling * through filter_setattr() to filter_iocontrol(). */ - body->oa.o_valid &= ~OBD_MD_FLFLAGS; + repbody->oa.o_valid &= ~OBD_MD_FLFLAGS; - if (body->oa.o_valid & OBD_MD_FLOSSCAPA) { + if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) { capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); if (capa == NULL) { @@ -408,7 +411,7 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, OBD_ALLOC_PTR(oinfo); if (!oinfo) GOTO(unlock, rc = -ENOMEM); - oinfo->oi_oa = &body->oa; + oinfo->oi_oa = &repbody->oa; oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size; oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks; oinfo->oi_capa = capa; @@ -420,8 +423,6 @@ unlock: ost_lock_put(exp, &lh, LCK_PW); } - repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - repbody->oa = body->oa; ost_drop_id(exp, &repbody->oa); RETURN(rc); } @@ -454,18 +455,19 @@ static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req) if (rc) RETURN(rc); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + repbody->oa = body->oa; + OBD_ALLOC_PTR(oinfo); if (!oinfo) RETURN(-ENOMEM); - oinfo->oi_oa = &body->oa; + oinfo->oi_oa = &repbody->oa; oinfo->oi_capa = capa; - req->rq_status = obd_sync(exp, oinfo, body->oa.o_size, - body->oa.o_blocks, NULL); + req->rq_status = obd_sync(exp, oinfo, repbody->oa.o_size, + repbody->oa.o_blocks, NULL); OBD_FREE_PTR(oinfo); - repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - repbody->oa = body->oa; ost_drop_id(exp, &repbody->oa); RETURN(0); } @@ -499,18 +501,19 @@ static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, } } + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + repbody->oa = body->oa; + OBD_ALLOC_PTR(oinfo); if (!oinfo) RETURN(-ENOMEM); - oinfo->oi_oa = &body->oa; + oinfo->oi_oa = &repbody->oa; oinfo->oi_capa = capa; req->rq_status = obd_setattr(exp, oinfo, oti); OBD_FREE_PTR(oinfo); - repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - repbody->oa = body->oa; ost_drop_id(exp, &repbody->oa); RETURN(0); } @@ -594,124 +597,6 @@ static void ost_brw_lock_put(int mode, EXIT; } -struct ost_prolong_data { - struct obd_export *opd_exp; - ldlm_policy_data_t opd_policy; - struct obdo *opd_oa; - ldlm_mode_t opd_mode; - int opd_lock_match; - int opd_timeout; -}; - -static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) -{ - struct ost_prolong_data *opd = data; - - LASSERT(lock->l_resource->lr_type == LDLM_EXTENT); - - if (lock->l_req_mode != lock->l_granted_mode) { - /* scan granted locks only */ - return LDLM_ITER_STOP; - } - - if (lock->l_export != opd->opd_exp) { - /* prolong locks only for given client */ - return LDLM_ITER_CONTINUE; - } - - if (!(lock->l_granted_mode & opd->opd_mode)) { - /* we aren't interesting in all type of locks */ - return LDLM_ITER_CONTINUE; - } - - if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start || - lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) { - /* the request doesn't cross the lock, skip it */ - return LDLM_ITER_CONTINUE; - } - - /* Fill the obdo with the matched lock handle. - * XXX: it is possible in some cases the IO RPC is covered by several - * locks, even for the write case, so it may need to be a lock list. */ - if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) { - opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie; - opd->opd_oa->o_valid |= OBD_MD_FLHANDLE; - } - - if (!(lock->l_flags & LDLM_FL_AST_SENT)) { - /* ignore locks not being cancelled */ - return LDLM_ITER_CONTINUE; - } - - CDEBUG(D_DLMTRACE,"refresh lock: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n", - lock->l_resource->lr_name.name[0], - lock->l_resource->lr_name.name[1], - opd->opd_policy.l_extent.start, opd->opd_policy.l_extent.end); - /* OK. this is a possible lock the user holds doing I/O - * let's refresh eviction timer for it */ - ldlm_refresh_waiting_lock(lock, opd->opd_timeout); - opd->opd_lock_match = 1; - - return LDLM_ITER_CONTINUE; -} - -static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj, - struct niobuf_remote *nb, struct obdo *oa, - ldlm_mode_t mode) -{ - struct ldlm_res_id res_id; - int nrbufs = obj->ioo_bufcnt; - struct ost_prolong_data opd = { 0 }; - ENTRY; - - osc_build_res_name(obj->ioo_id, obj->ioo_seq, &res_id); - - opd.opd_mode = mode; - opd.opd_exp = req->rq_export; - opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK; - opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset + - nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK; - - /* prolong locks for the current service time of the corresponding - * portal (= OST_IO_PORTAL) */ - opd.opd_timeout = AT_OFF ? obd_timeout / 2: - max(at_est2timeout(at_get(&req->rq_rqbd-> - rqbd_service->srv_at_estimate)), ldlm_timeout); - - CDEBUG(D_INFO,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n", - res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start, - opd.opd_policy.l_extent.end); - - if (oa->o_valid & OBD_MD_FLHANDLE) { - struct ldlm_lock *lock; - - lock = ldlm_handle2lock(&oa->o_handle); - if (lock != NULL) { - ost_prolong_locks_iter(lock, &opd); - if (opd.opd_lock_match) { - LDLM_LOCK_PUT(lock); - RETURN(1); - } - - /* Check if the lock covers the whole IO region, - * otherwise iterate through the resource. */ - if (lock->l_policy_data.l_extent.end >= - opd.opd_policy.l_extent.end && - lock->l_policy_data.l_extent.start <= - opd.opd_policy.l_extent.start) { - LDLM_LOCK_PUT(lock); - RETURN(0); - } - LDLM_LOCK_PUT(lock); - } - } - - opd.opd_oa = oa; - ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id, - ost_prolong_locks_iter, &opd); - RETURN(opd.opd_lock_match); -} - /* Allocate thread local buffers if needed */ static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r) { @@ -843,8 +728,11 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_lock, rc = -ETIMEDOUT); } + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); + npages = OST_THREAD_POOL_SIZE; - rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo, + rc = obd_preprw(OBD_BRW_READ, exp, &repbody->oa, 1, ioo, remote_nb, &npages, local_nb, oti, capa); if (rc != 0) GOTO(out_lock, rc); @@ -854,12 +742,6 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (desc == NULL) GOTO(out_commitrw, rc = -ENOMEM); - if (!lustre_handle_is_used(&lockh)) - /* no needs to try to prolong lock if server is asked - * to handle locking (= OBD_BRW_SRVLOCK) */ - ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa, - LCK_PW | LCK_PR); - nob = 0; for (i = 0; i < npages; i++) { int page_rc = local_nb[i].rc; @@ -887,14 +769,15 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (body->oa.o_valid & OBD_MD_FLCKSUM) { cksum_type_t cksum_type = - cksum_type_unpack(body->oa.o_valid & OBD_MD_FLFLAGS ? - body->oa.o_flags : 0); - body->oa.o_flags = cksum_type_pack(cksum_type); - body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; - body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type); - CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum); + cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ? + repbody->oa.o_flags : 0); + repbody->oa.o_flags = cksum_type_pack(cksum_type); + repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type); + CDEBUG(D_PAGE, "checksum at read origin: %x\n", + repbody->oa.o_cksum); } else { - body->oa.o_valid = 0; + repbody->oa.o_valid = 0; } /* We're finishing using body->oa as an input variable */ @@ -907,14 +790,11 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) out_commitrw: /* Must commit after prep above in all cases */ - rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo, + rc = obd_commitrw(OBD_BRW_READ, exp, &repbody->oa, 1, ioo, remote_nb, npages, local_nb, oti, rc); - if (rc == 0) { - repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); + if (rc == 0) ost_drop_id(exp, &repbody->oa); - } out_lock: ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh); @@ -1053,11 +933,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_lock, rc = -ETIMEDOUT); } - if (!lustre_handle_is_used(&lockh)) - /* no needs to try to prolong lock if server is asked - * to handle locking (= OBD_BRW_SRVLOCK) */ - ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa, LCK_PW); - /* obd_preprw clobbers oa->valid, so save what we need */ if (body->oa.o_valid & OBD_MD_FLCKSUM) { client_cksum = body->oa.o_cksum; @@ -1079,8 +954,12 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) o_uid = body->oa.o_uid; o_gid = body->oa.o_gid; } + + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); + npages = OST_THREAD_POOL_SIZE; - rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount, + rc = obd_preprw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo, remote_nb, &npages, local_nb, oti, capa); if (rc != 0) GOTO(out_lock, rc); @@ -1105,9 +984,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) no_reply = rc != 0; skip_transfer: - repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); - if (unlikely(client_cksum != 0 && rc == 0)) { static int cksum_counter; repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; @@ -1769,6 +1645,101 @@ int ost_msg_check_version(struct lustre_msg *msg) return rc; } +struct ost_prolong_data { + struct ptlrpc_request *opd_req; + struct obd_export *opd_exp; + struct obdo *opd_oa; + struct ldlm_res_id opd_resid; + struct ldlm_extent opd_extent; + ldlm_mode_t opd_mode; + unsigned int opd_locks; + int opd_timeout; +}; + +/* prolong locks for the current service time of the corresponding + * portal (= OST_IO_PORTAL) + */ +static inline int prolong_timeout(struct ptlrpc_request *req) +{ + struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + + if (AT_OFF) + return obd_timeout / 2; + + return max(at_est2timeout(at_get(&svc->srv_at_estimate)), ldlm_timeout); +} + +static void ost_prolong_lock_one(struct ost_prolong_data *opd, + struct ldlm_lock *lock) +{ + LASSERT(lock->l_req_mode == lock->l_granted_mode); + LASSERT(lock->l_export == opd->opd_exp); + + /* XXX: never try to grab resource lock here because we're inside + * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take + * res lock and then exp_bl_list_lock. */ + + if (!(lock->l_flags & LDLM_FL_AST_SENT)) + /* ignore locks not being cancelled */ + return; + + LDLM_DEBUG(lock, + "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n", + opd->opd_req->rq_xid, opd->opd_extent.start, + opd->opd_extent.end, opd->opd_timeout); + + /* OK. this is a possible lock the user holds doing I/O + * let's refresh eviction timer for it */ + ldlm_refresh_waiting_lock(lock, opd->opd_timeout); + ++opd->opd_locks; +} + +static void ost_prolong_locks(struct ost_prolong_data *data) +{ + struct obd_export *exp = data->opd_exp; + struct obdo *oa = data->opd_oa; + struct ldlm_lock *lock; + ENTRY; + + if (oa->o_valid & OBD_MD_FLHANDLE) { + /* mostly a request should be covered by only one lock, try + * fast path. */ + lock = ldlm_handle2lock(&oa->o_handle); + if (lock != NULL) { + /* Fast path to check if the lock covers the whole IO + * region exclusively. */ + if (lock->l_granted_mode == LCK_PW && + ldlm_extent_contain(&lock->l_policy_data.l_extent, + &data->opd_extent)) { + /* bingo */ + ost_prolong_lock_one(data, lock); + LDLM_LOCK_PUT(lock); + RETURN_EXIT; + } + LDLM_LOCK_PUT(lock); + } + } + + + cfs_spin_lock_bh(&exp->exp_bl_list_lock); + cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) { + LASSERT(lock->l_flags & LDLM_FL_AST_SENT); + LASSERT(lock->l_resource->lr_type == LDLM_EXTENT); + + if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name)) + continue; + + if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent, + &data->opd_extent)) + continue; + + ost_prolong_lock_one(data, lock); + } + cfs_spin_unlock_bh(&exp->exp_bl_list_lock); + + EXIT; +} + /** * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does * not. @@ -1778,61 +1749,35 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req, { struct niobuf_remote *nb; struct obd_ioobj *ioo; - struct ost_body *body; - int objcount, niocount; - int mode, opc, i, rc; - __u64 start, end; + int mode, opc; + struct ldlm_extent ext; ENTRY; opc = lustre_msg_get_opc(req->rq_reqmsg); LASSERT(opc == OST_READ || opc == OST_WRITE); - /* As the request may be covered by several locks, do not look at - * o_handle, look at the RPC IO region. */ - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - RETURN(0); - - objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ, - RCL_CLIENT) / sizeof(*ioo); ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ); - if (ioo == NULL) - RETURN(0); - - rc = ost_validate_obdo(req->rq_export, &body->oa, ioo); - if (rc) - RETURN(rc); - - for (niocount = i = 0; i < objcount; i++) - niocount += ioo[i].ioo_bufcnt; + LASSERT(ioo != NULL); nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); - if (nb == NULL || - niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE, - RCL_CLIENT) / sizeof(*nb))) - RETURN(0); - - mode = LCK_PW; - if (opc == OST_READ) - mode |= LCK_PR; + LASSERT(nb != NULL); - start = nb[0].offset & CFS_PAGE_MASK; - end = (nb[ioo->ioo_bufcnt - 1].offset + - nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK; + ext.start = nb->offset; + nb += ioo->ioo_bufcnt - 1; + ext.end = nb->offset + nb->len - 1; LASSERT(lock->l_resource != NULL); if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq, &lock->l_resource->lr_name)) RETURN(0); + mode = LCK_PW; + if (opc == OST_READ) + mode |= LCK_PR; if (!(lock->l_granted_mode & mode)) RETURN(0); - if (lock->l_policy_data.l_extent.end < start || - lock->l_policy_data.l_extent.start > end) - RETURN(0); - - RETURN(1); + RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext)); } /** @@ -1847,78 +1792,62 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req, */ static int ost_rw_hpreq_check(struct ptlrpc_request *req) { - struct niobuf_remote *nb; - struct obd_ioobj *ioo; + struct obd_device *obd = req->rq_export->exp_obd; struct ost_body *body; - int objcount, niocount; - int mode, opc, i, rc; + struct obd_ioobj *ioo; + struct niobuf_remote *nb; + struct ost_prolong_data opd = { 0 }; + int mode, opc; ENTRY; + /* + * Use LASSERT to do sanity check because malformed RPCs should have + * been filtered out in ost_hpreq_handler(). + */ opc = lustre_msg_get_opc(req->rq_reqmsg); LASSERT(opc == OST_READ || opc == OST_WRITE); body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - RETURN(-EFAULT); + LASSERT(body != NULL); - objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ, - RCL_CLIENT) / sizeof(*ioo); ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ); - if (ioo == NULL) - RETURN(-EFAULT); - - rc = ost_validate_obdo(req->rq_export, &body->oa, ioo); - if (rc) - RETURN(rc); + LASSERT(ioo != NULL); - for (niocount = i = 0; i < objcount; i++) - niocount += ioo[i].ioo_bufcnt; nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); - if (nb == NULL || - niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE, - RCL_CLIENT) / sizeof(*nb))) - RETURN(-EFAULT); - if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK)) - RETURN(-EFAULT); + LASSERT(nb != NULL); + LASSERT(!(nb->flags & OBD_BRW_SRVLOCK)); + osc_build_res_name(ioo->ioo_id, ioo->ioo_seq, &opd.opd_resid); + + opd.opd_req = req; mode = LCK_PW; if (opc == OST_READ) mode |= LCK_PR; - RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode)); -} + opd.opd_mode = mode; + opd.opd_exp = req->rq_export; + opd.opd_oa = &body->oa; + opd.opd_extent.start = nb->offset; + nb += ioo->ioo_bufcnt - 1; + opd.opd_extent.end = nb->offset + nb->len - 1; + opd.opd_timeout = prolong_timeout(req); -static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa) -{ - struct ldlm_res_id res_id = { .name = { oa->o_id } }; - struct ost_prolong_data opd = { 0 }; - __u64 start, end; - ENTRY; + DEBUG_REQ(D_RPCTRACE, req, + "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n", + obd->obd_name, cfs_current()->comm, + opd.opd_resid.name[0], opd.opd_resid.name[1], + opd.opd_extent.start, opd.opd_extent.end); - start = oa->o_size; - end = start + oa->o_blocks; + ost_prolong_locks(&opd); - opd.opd_mode = LCK_PW; - opd.opd_exp = req->rq_export; - opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK; - if (oa->o_blocks == OBD_OBJECT_EOF || end < start) - opd.opd_policy.l_extent.end = OBD_OBJECT_EOF; - else - opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK; - - /* prolong locks for the current service time of the corresponding - * portal (= OST_IO_PORTAL) */ - opd.opd_timeout = AT_OFF ? obd_timeout / 2: - max(at_est2timeout(at_get(&req->rq_rqbd-> - rqbd_service->srv_at_estimate)), ldlm_timeout); - - CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n", - res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start, - opd.opd_policy.l_extent.end); - - opd.opd_oa = oa; - ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id, - ost_prolong_locks_iter, &opd); - RETURN(opd.opd_lock_match); + CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n", + obd->obd_name, opd.opd_locks, req); + + RETURN(opd.opd_locks); +} + +static void ost_rw_hpreq_fini(struct ptlrpc_request *req) +{ + (void)ost_rw_hpreq_check(req); } /** @@ -1928,20 +1857,15 @@ static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req, struct ldlm_lock *lock) { struct ost_body *body; - int rc; ENTRY; body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - RETURN(0); /* can't return -EFAULT here */ - - rc = ost_validate_obdo(req->rq_export, &body->oa, NULL); - if (rc) - RETURN(rc); + LASSERT(body != NULL); if (body->oa.o_valid & OBD_MD_FLHANDLE && body->oa.o_handle.cookie == lock->l_handle.h_cookie) RETURN(1); + RETURN(0); } @@ -1950,31 +1874,64 @@ static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req, */ static int ost_punch_hpreq_check(struct ptlrpc_request *req) { + struct obd_device *obd = req->rq_export->exp_obd; struct ost_body *body; - int rc; + struct obdo *oa; + struct ost_prolong_data opd = { 0 }; + __u64 start, end; + ENTRY; body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - RETURN(-EFAULT); + LASSERT(body != NULL); - rc = ost_validate_obdo(req->rq_export, &body->oa, NULL); - if (rc) - RETURN(rc); + oa = &body->oa; + LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) || + !(oa->o_flags & OBD_FL_SRVLOCK)); - LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) || - !(body->oa.o_flags & OBD_FL_SRVLOCK)); + start = oa->o_size; + end = start + oa->o_blocks; + + opd.opd_req = req; + opd.opd_mode = LCK_PW; + opd.opd_exp = req->rq_export; + opd.opd_oa = oa; + opd.opd_extent.start = start; + opd.opd_extent.end = end; + if (oa->o_blocks == OBD_OBJECT_EOF) + opd.opd_extent.end = OBD_OBJECT_EOF; + opd.opd_timeout = prolong_timeout(req); + + osc_build_res_name(oa->o_id, oa->o_seq, &opd.opd_resid); + + CDEBUG(D_DLMTRACE, + "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n", + obd->obd_name, + opd.opd_resid.name[0], opd.opd_resid.name[1], + opd.opd_extent.start, opd.opd_extent.end); + + ost_prolong_locks(&opd); - RETURN(ost_punch_prolong_locks(req, &body->oa)); + CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n", + obd->obd_name, opd.opd_locks, req); + + RETURN(opd.opd_locks > 0); +} + +static void ost_punch_hpreq_fini(struct ptlrpc_request *req) +{ + (void)ost_punch_hpreq_check(req); } struct ptlrpc_hpreq_ops ost_hpreq_rw = { - .hpreq_lock_match = ost_rw_hpreq_lock_match, - .hpreq_check = ost_rw_hpreq_check, + .hpreq_lock_match = ost_rw_hpreq_lock_match, + .hpreq_check = ost_rw_hpreq_check, + .hpreq_fini = ost_rw_hpreq_fini }; struct ptlrpc_hpreq_ops ost_hpreq_punch = { - .hpreq_lock_match = ost_punch_hpreq_lock_match, - .hpreq_check = ost_punch_hpreq_check, + .hpreq_lock_match = ost_punch_hpreq_lock_match, + .hpreq_check = ost_punch_hpreq_check, + .hpreq_fini = ost_punch_hpreq_fini }; /** Assign high priority operations to the request if needed. */ @@ -1989,6 +1946,7 @@ static int ost_hpreq_handler(struct ptlrpc_request *req) struct niobuf_remote *nb; struct obd_ioobj *ioo; int objcount, niocount; + int rc; int i; /* RPCs on the H-P queue can be inspected before @@ -2032,6 +1990,12 @@ static int ost_hpreq_handler(struct ptlrpc_request *req) RETURN(-EFAULT); } + rc = ost_validate_obdo(req->rq_export, &body->oa, ioo); + if (rc) { + CERROR("invalid object ids\n"); + RETURN(rc); + } + for (niocount = i = 0; i < objcount; i++) { if (ioo[i].ioo_bufcnt == 0) { CERROR("ioo[%d] has zero bufcnt\n", i); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index dd702eb..66ff9fc 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -67,6 +67,7 @@ CFS_MODULE_PARM(at_extra, "i", int, 0644, /* forward ref */ static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc); +static void ptlrpc_hpreq_fini(struct ptlrpc_request *req); static CFS_LIST_HEAD(ptlrpc_all_services); cfs_spinlock_t ptlrpc_all_services_lock; @@ -735,6 +736,8 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req) static void ptlrpc_server_finish_request(struct ptlrpc_service *svc, struct ptlrpc_request *req) { + ptlrpc_hpreq_fini(req); + cfs_spin_lock(&svc->srv_rq_lock); svc->srv_n_active_reqs--; if (req->rq_hp) @@ -1209,7 +1212,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) static int ptlrpc_hpreq_init(struct ptlrpc_service *svc, struct ptlrpc_request *req) { - int rc; + int rc = 0; ENTRY; if (svc->srv_hpreq_handler) { @@ -1218,13 +1221,19 @@ static int ptlrpc_hpreq_init(struct ptlrpc_service *svc, RETURN(rc); } if (req->rq_export && req->rq_ops) { + /* Perform request specific check. We should do this check + * before the request is added into exp_hp_rpcs list otherwise + * it may hit swab race at LU-1044. */ + if (req->rq_ops->hpreq_check) + rc = req->rq_ops->hpreq_check(req); + cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock); cfs_list_add(&req->rq_exp_list, - &req->rq_export->exp_queued_rpc); + &req->rq_export->exp_hp_rpcs); cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock); } - RETURN(0); + RETURN(rc); } /** Remove the request from the export list. */ @@ -1232,6 +1241,11 @@ static void ptlrpc_hpreq_fini(struct ptlrpc_request *req) { ENTRY; if (req->rq_export && req->rq_ops) { + /* refresh lock timeout again so that client has more + * room to send lock cancel RPC. */ + if (req->rq_ops->hpreq_fini) + req->rq_ops->hpreq_fini(req); + cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock); cfs_list_del_init(&req->rq_exp_list); cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock); @@ -1262,7 +1276,7 @@ static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc, cfs_list_move_tail(&req->rq_list, &svc->srv_request_hpq); req->rq_hp = 1; if (opc != OBD_PING) - DEBUG_REQ(D_NET, req, "high priority req"); + DEBUG_REQ(D_RPCTRACE, req, "high priority req"); } cfs_spin_unlock(&req->rq_lock); EXIT; @@ -1288,20 +1302,16 @@ void ptlrpc_hpreq_reorder(struct ptlrpc_request *req) } /** Check if the request is a high priority one. */ -static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req) +static int ptlrpc_server_hpreq_check(struct ptlrpc_service *svc, + struct ptlrpc_request *req) { - int opc, rc = 0; ENTRY; /* Check by request opc. */ - opc = lustre_msg_get_opc(req->rq_reqmsg); - if (opc == OBD_PING) + if (OBD_PING == lustre_msg_get_opc(req->rq_reqmsg)) RETURN(1); - /* Perform request specific check. */ - if (req->rq_ops && req->rq_ops->hpreq_check) - rc = req->rq_ops->hpreq_check(req); - RETURN(rc); + RETURN(ptlrpc_hpreq_init(svc, req)); } /** Check if a request is a high priority one. */ @@ -1311,7 +1321,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc, int rc; ENTRY; - rc = ptlrpc_server_hpreq_check(req); + rc = ptlrpc_server_hpreq_check(svc, req); if (rc < 0) RETURN(rc); @@ -1518,7 +1528,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) break; } - CDEBUG(D_NET, "got req "LPU64"\n", req->rq_xid); + CDEBUG(D_RPCTRACE, "got req x"LPU64"\n", req->rq_xid); req->rq_export = class_conn2export( lustre_msg_get_handle(req->rq_reqmsg)); @@ -1554,9 +1564,6 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) } ptlrpc_at_add_timed(req); - rc = ptlrpc_hpreq_init(svc, req); - if (rc) - GOTO(err_req, rc); /* Move it over to the request processing queue */ rc = ptlrpc_server_request_add(svc, req); @@ -1635,7 +1642,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, cfs_spin_unlock(&svc->srv_rq_lock); ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); - ptlrpc_hpreq_fini(request); if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG)) libcfs_debug_dumplog(); @@ -2686,7 +2692,6 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) req = ptlrpc_server_request_get(service, 1); cfs_list_del(&req->rq_list); service->srv_n_active_reqs++; - ptlrpc_hpreq_fini(req); ptlrpc_server_finish_request(service, req); } LASSERT(service->srv_n_queued_reqs == 0); -- 1.8.3.1