From: anserper Date: Tue, 15 Sep 2009 21:09:34 +0000 (+0000) Subject: b=18948 X-Git-Tag: v1_9_270~49 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=f517ec663e9c54350aa4a150bcd2016ae25ad872;ds=sidebyside b=18948 i=Nathan Rutman i=Eric Mei o=Mike Pershin use rq_refcount instead of copying during recovery attachment 25372 + attachment 25518 --- diff --git a/libcfs/include/libcfs/user-lock.h b/libcfs/include/libcfs/user-lock.h index 6d87c0d..958519e 100644 --- a/libcfs/include/libcfs/user-lock.h +++ b/libcfs/include/libcfs/user-lock.h @@ -237,6 +237,8 @@ typedef struct { volatile int counter; } atomic_t; #define atomic_sub(b,a) do {(a)->counter -= b;} while (0) #define atomic_sub_return(n,a) ((a)->counter -= n) #define atomic_dec_return(a) atomic_sub_return(1,a) +#define atomic_add_unless(v, a, u) ((v)->counter != u ? (v)->counter += a : 0) +#define atomic_inc_not_zero(v) atomic_add_unless((v), 1, 0) #ifdef HAVE_LIBPTHREAD diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 5335daf..91e5847 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -367,7 +367,6 @@ struct ptlrpc_request { rq_fake:1, /* this fake req */ /* server-side flags */ rq_packed_final:1, /* packed final reply */ - rq_sent_final:1, /* stop sending early replies */ rq_hp:1, /* high priority RPC */ rq_at_linked:1; /* link into service's srv_at_array */ @@ -492,6 +491,7 @@ struct ptlrpc_request { struct ptlrpc_request_pool *rq_pool; /* Pool if request from preallocated list */ struct lu_context rq_session; + struct lu_context rq_recov_session; /* request format */ struct req_capsule rq_pill; @@ -1065,6 +1065,7 @@ int liblustre_check_services (void *arg); void ptlrpc_daemonize(char *name); int ptlrpc_service_health_check(struct ptlrpc_service *); void ptlrpc_hpreq_reorder(struct ptlrpc_request *req); +void ptlrpc_server_drop_request(struct ptlrpc_request *req); #ifdef __KERNEL__ int ptlrpc_hr_init(void); diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 54683be..56dd936 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -1063,74 +1063,25 @@ void target_destroy_export(struct obd_export *exp) /* * Recovery functions */ - -struct ptlrpc_request *ptlrpc_clone_req( struct ptlrpc_request *orig_req) +static void target_request_copy_get(struct ptlrpc_request *req) { - struct ptlrpc_request *copy_req; - struct lustre_msg *copy_reqmsg; - struct ptlrpc_user_desc *udesc = NULL; - - OBD_ALLOC_PTR(copy_req); - if (!copy_req) - return NULL; - OBD_ALLOC(copy_reqmsg, orig_req->rq_reqlen); - if (!copy_reqmsg){ - OBD_FREE_PTR(copy_req); - return NULL; - } - - if (orig_req->rq_user_desc) { - int ngroups = orig_req->rq_user_desc->pud_ngroups; - - OBD_ALLOC(udesc, sptlrpc_user_desc_size(ngroups)); - if (!udesc) { - OBD_FREE(copy_reqmsg, orig_req->rq_reqlen); - OBD_FREE_PTR(copy_req); - return NULL; - } - memcpy(udesc, orig_req->rq_user_desc, - sptlrpc_user_desc_size(ngroups)); - } - - *copy_req = *orig_req; - memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen); - copy_req->rq_reqmsg = copy_reqmsg; - copy_req->rq_user_desc = udesc; - - class_export_rpc_get(copy_req->rq_export); - CFS_INIT_LIST_HEAD(©_req->rq_list); - CFS_INIT_LIST_HEAD(©_req->rq_replay_list); - sptlrpc_svc_ctx_addref(copy_req); - - if (copy_req->rq_reply_state) { - /* the copied req takes over the reply state */ - orig_req->rq_reply_state = NULL; - /* to catch further access */ - orig_req->rq_repmsg = NULL; - orig_req->rq_replen = 0; - } + class_export_rpc_get(req->rq_export); + LASSERT(list_empty(&req->rq_list)); + CFS_INIT_LIST_HEAD(&req->rq_replay_list); + /* increase refcount to keep request in queue */ + LASSERT(atomic_read(&req->rq_refcount)); + atomic_inc(&req->rq_refcount); /** let export know it has replays to be handled */ - atomic_inc(©_req->rq_export->exp_replay_count); - return copy_req; + atomic_inc(&req->rq_export->exp_replay_count); } -void ptlrpc_free_clone(struct ptlrpc_request *req) +static void target_request_copy_put(struct ptlrpc_request *req) { LASSERT(list_empty(&req->rq_replay_list)); - - ptlrpc_req_drop_rs(req); - sptlrpc_svc_ctx_decref(req); LASSERT(atomic_read(&req->rq_export->exp_replay_count) > 0); atomic_dec(&req->rq_export->exp_replay_count); class_export_rpc_put(req->rq_export); - list_del_init(&req->rq_list); - - if (req->rq_user_desc) { - int ngroups = req->rq_user_desc->pud_ngroups; - OBD_FREE(req->rq_user_desc, sptlrpc_user_desc_size(ngroups)); - } - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE_PTR(req); + ptlrpc_server_drop_request(req); } static int target_exp_enqueue_req_replay(struct ptlrpc_request *req) @@ -1226,7 +1177,7 @@ static void abort_req_replay_queue(struct obd_device *obd) "failed abort_req_reply; skipping"); } target_exp_dequeue_req_replay(req); - ptlrpc_free_clone(req); + target_request_copy_put(req); } } @@ -1246,7 +1197,7 @@ static void abort_lock_replay_queue(struct obd_device *obd) DEBUG_REQ(D_ERROR, req, "failed abort_lock_reply; skipping"); } - ptlrpc_free_clone(req); + target_request_copy_put(req); } } #endif @@ -1282,7 +1233,7 @@ void target_cleanup_recovery(struct obd_device *obd) list_for_each_entry_safe(req, n, &clean_list, rq_list) { LASSERT(req->rq_reply_state == 0); target_exp_dequeue_req_replay(req); - ptlrpc_free_clone(req); + target_request_copy_put(req); } spin_lock_bh(&obd->obd_processing_task_lock); @@ -1292,7 +1243,7 @@ void target_cleanup_recovery(struct obd_device *obd) list_for_each_entry_safe(req, n, &clean_list, rq_list){ LASSERT(req->rq_reply_state == 0); - ptlrpc_free_clone(req); + target_request_copy_put(req); } EXIT; @@ -1660,30 +1611,30 @@ static int handle_recovery_req(struct ptlrpc_thread *thread, int rc; ENTRY; - rc = lu_context_init(&req->rq_session, LCT_SESSION); + rc = lu_context_init(&req->rq_recov_session, LCT_SESSION); if (rc) { CERROR("Failure to initialize session: %d\n", rc); - GOTO(free_clone, rc); + GOTO(reqcopy_put, rc); } /** * export can be evicted during recovery, no need to handle replays for * it after that, discard such request silently */ if (req->rq_export->exp_disconnected) - GOTO(free_clone, rc); + GOTO(reqcopy_put, rc); - req->rq_session.lc_thread = thread; - lu_context_enter(&req->rq_session); + req->rq_recov_session.lc_thread = thread; + lu_context_enter(&req->rq_recov_session); req->rq_svc_thread = thread; - req->rq_svc_thread->t_env->le_ses = &req->rq_session; + req->rq_svc_thread->t_env->le_ses = &req->rq_recov_session; /* thread context */ lu_context_enter(&thread->t_env->le_ctx); (void)handler(req); lu_context_exit(&thread->t_env->le_ctx); - lu_context_exit(&req->rq_session); - lu_context_fini(&req->rq_session); + lu_context_exit(&req->rq_recov_session); + lu_context_fini(&req->rq_recov_session); /* don't reset timer for final stage */ if (!exp_finished(req->rq_export)) { /** @@ -1696,7 +1647,7 @@ static int handle_recovery_req(struct ptlrpc_thread *thread, obd_timeout : RECONNECT_DELAY_MAX, 1); } /** - * bz18031: increase next_recovery_transno before ptlrpc_free_clone() + * bz18031: increase next_recovery_transno before target_request_copy_put() * will drop exp_rpc reference */ if (req->rq_export->exp_req_replay_needed) { @@ -1705,8 +1656,8 @@ static int handle_recovery_req(struct ptlrpc_thread *thread, spin_unlock_bh(&req->rq_export->exp_obd->obd_processing_task_lock); target_exp_dequeue_req_replay(req); } -free_clone: - ptlrpc_free_clone(req); +reqcopy_put: + target_request_copy_put(req); RETURN(0); } @@ -1958,31 +1909,23 @@ int target_queue_recovery_request(struct ptlrpc_request *req, if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) { /* client declares he's ready to complete recovery * so, we put the request on th final queue */ - req = ptlrpc_clone_req(req); - if (req == NULL) - RETURN(-ENOMEM); + target_request_copy_get(req); DEBUG_REQ(D_HA, req, "queue final req"); spin_lock_bh(&obd->obd_processing_task_lock); cfs_waitq_signal(&obd->obd_next_transno_waitq); - if (obd->obd_recovering) + if (obd->obd_recovering) { list_add_tail(&req->rq_list, &obd->obd_final_req_queue); - else { + } else { spin_unlock_bh(&obd->obd_processing_task_lock); - ptlrpc_free_clone(req); - if (obd->obd_stopping) { - RETURN(-ENOTCONN); - } else { - RETURN(1); - } + target_request_copy_put(req); + RETURN(obd->obd_stopping ? -ENOTCONN : 1); } spin_unlock_bh(&obd->obd_processing_task_lock); RETURN(0); } if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) { /* client declares he's ready to replay locks */ - req = ptlrpc_clone_req(req); - if (req == NULL) - RETURN(-ENOMEM); + target_request_copy_get(req); DEBUG_REQ(D_HA, req, "queue lock replay req"); spin_lock_bh(&obd->obd_processing_task_lock); cfs_waitq_signal(&obd->obd_next_transno_waitq); @@ -1990,7 +1933,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, /* usually due to recovery abort */ if (!req->rq_export->exp_in_recovery) { spin_unlock_bh(&obd->obd_processing_task_lock); - ptlrpc_free_clone(req); + target_request_copy_put(req); RETURN(-ENOTCONN); } LASSERT(req->rq_export->exp_lock_replay_needed); @@ -2034,15 +1977,12 @@ int target_queue_recovery_request(struct ptlrpc_request *req, if (OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP)) RETURN(0); - req = ptlrpc_clone_req(req); - if (req == NULL) - RETURN(-ENOMEM); - + target_request_copy_get(req); spin_lock_bh(&obd->obd_processing_task_lock); LASSERT(obd->obd_recovering); if (!req->rq_export->exp_in_recovery) { spin_unlock_bh(&obd->obd_processing_task_lock); - ptlrpc_free_clone(req); + target_request_copy_put(req); RETURN(-ENOTCONN); } LASSERT(req->rq_export->exp_req_replay_needed); @@ -2050,7 +1990,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, if (target_exp_enqueue_req_replay(req)) { spin_unlock_bh(&obd->obd_processing_task_lock); DEBUG_REQ(D_ERROR, req, "dropping resent queued req"); - ptlrpc_free_clone(req); + target_request_copy_put(req); RETURN(0); } @@ -2071,7 +2011,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, "has been claimed by another client"); spin_unlock_bh(&obd->obd_processing_task_lock); target_exp_dequeue_req_replay(req); - ptlrpc_free_clone(req); + target_request_copy_put(req); RETURN(0); } } diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 060d741..b458a8a 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -601,7 +601,7 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req) * drop a reference count of the request. if it reaches 0, we either * put it into history list, or free it immediately. */ -static void ptlrpc_server_drop_request(struct ptlrpc_request *req) +void ptlrpc_server_drop_request(struct ptlrpc_request *req) { struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; @@ -612,6 +612,24 @@ static void ptlrpc_server_drop_request(struct ptlrpc_request *req) if (!atomic_dec_and_test(&req->rq_refcount)) return; + spin_lock(&svc->srv_at_lock); + list_del_init(&req->rq_timed_list); + if (req->rq_at_linked) { + struct ptlrpc_at_array *array = &svc->srv_at_array; + __u32 index = req->rq_at_index; + + req->rq_at_linked = 0; + array->paa_reqs_count[index]--; + array->paa_count--; + } + spin_unlock(&svc->srv_at_lock); + + /* finalize request */ + if (req->rq_export) { + class_export_put(req->rq_export); + req->rq_export = NULL; + } + spin_lock(&svc->srv_lock); svc->srv_n_active_reqs--; @@ -685,29 +703,6 @@ static void ptlrpc_server_drop_request(struct ptlrpc_request *req) */ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) { - struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; - - if (req->rq_export) { - class_export_put(req->rq_export); - req->rq_export = NULL; - } - - if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ - DEBUG_REQ(D_INFO, req, "free req"); - - spin_lock(&svc->srv_at_lock); - req->rq_sent_final = 1; - list_del_init(&req->rq_timed_list); - if (req->rq_at_linked) { - struct ptlrpc_at_array *array = &svc->srv_at_array; - __u32 index = req->rq_at_index; - - req->rq_at_linked = 0; - array->paa_reqs_count[index]--; - array->paa_count--; - } - spin_unlock(&svc->srv_at_lock); - ptlrpc_server_drop_request(req); } @@ -859,12 +854,6 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req) return(-ENOSYS); spin_lock(&svc->srv_at_lock); - - if (unlikely(req->rq_sent_final)) { - spin_unlock(&svc->srv_at_lock); - return 0; - } - LASSERT(list_empty(&req->rq_timed_list)); index = (unsigned long)req->rq_deadline % array->paa_size; @@ -984,10 +973,12 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, reqcopy->rq_reqmsg = reqmsg; memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); - if (req->rq_sent_final) { + LASSERT(atomic_read(&req->rq_refcount)); + /** if it is last refcount then early reply isn't needed */ + if (atomic_read(&req->rq_refcount) == 1) { DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, " "abort sending early reply\n"); - GOTO(out, rc = 0); + GOTO(out, rc = -EINVAL); } /* Connection ref */ @@ -1077,7 +1068,14 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index], rq_timed_list) { if (rq->rq_deadline <= now + at_early_margin) { - list_move(&rq->rq_timed_list, &work_list); + list_del(&rq->rq_timed_list); + /** + * ptlrpc_server_drop_request() may drop + * refcount to 0 already. Let's check this and + * don't add entry to work_list + */ + if (likely(atomic_inc_not_zero(&rq->rq_refcount))) + list_add(&rq->rq_timed_list, &work_list); counter++; array->paa_reqs_count[index]--; array->paa_count--; @@ -1114,25 +1112,18 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) at_get(&svc->srv_at_estimate), delay); } - /* ptlrpc_server_finish_request may delete an entry out of - * the work list */ - spin_lock(&svc->srv_at_lock); + /* we took additional refcount so entries can't be deleted from list, no + * locking is needed */ while (!list_empty(&work_list)) { rq = list_entry(work_list.next, struct ptlrpc_request, rq_timed_list); list_del_init(&rq->rq_timed_list); - /* if the entry is still in the worklist, it hasn't been - deleted, and is safe to take a ref to keep the req around */ - atomic_inc(&rq->rq_refcount); - spin_unlock(&svc->srv_at_lock); if (ptlrpc_at_send_early_reply(rq, at_extra) == 0) ptlrpc_at_add_timed(rq); ptlrpc_server_drop_request(rq); - spin_lock(&svc->srv_at_lock); } - spin_unlock(&svc->srv_at_lock); RETURN(0); }