/*
* Recovery functions
*/
-
-struct ptlrpc_request *ptlrpc_clone_req( struct ptlrpc_request *orig_req)
+static void target_request_copy_get(struct ptlrpc_request *req)
{
- struct ptlrpc_request *copy_req;
- struct lustre_msg *copy_reqmsg;
- struct ptlrpc_user_desc *udesc = NULL;
-
- OBD_ALLOC_PTR(copy_req);
- if (!copy_req)
- return NULL;
- OBD_ALLOC(copy_reqmsg, orig_req->rq_reqlen);
- if (!copy_reqmsg){
- OBD_FREE_PTR(copy_req);
- return NULL;
- }
-
- if (orig_req->rq_user_desc) {
- int ngroups = orig_req->rq_user_desc->pud_ngroups;
-
- OBD_ALLOC(udesc, sptlrpc_user_desc_size(ngroups));
- if (!udesc) {
- OBD_FREE(copy_reqmsg, orig_req->rq_reqlen);
- OBD_FREE_PTR(copy_req);
- return NULL;
- }
- memcpy(udesc, orig_req->rq_user_desc,
- sptlrpc_user_desc_size(ngroups));
- }
-
- *copy_req = *orig_req;
- memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen);
- copy_req->rq_reqmsg = copy_reqmsg;
- copy_req->rq_user_desc = udesc;
-
- class_export_rpc_get(copy_req->rq_export);
- CFS_INIT_LIST_HEAD(©_req->rq_list);
- CFS_INIT_LIST_HEAD(©_req->rq_replay_list);
- sptlrpc_svc_ctx_addref(copy_req);
-
- if (copy_req->rq_reply_state) {
- /* the copied req takes over the reply state */
- orig_req->rq_reply_state = NULL;
- /* to catch further access */
- orig_req->rq_repmsg = NULL;
- orig_req->rq_replen = 0;
- }
+ class_export_rpc_get(req->rq_export);
+ LASSERT(list_empty(&req->rq_list));
+ CFS_INIT_LIST_HEAD(&req->rq_replay_list);
+ /* increase refcount to keep request in queue */
+ LASSERT(atomic_read(&req->rq_refcount));
+ atomic_inc(&req->rq_refcount);
/** let export know it has replays to be handled */
- atomic_inc(©_req->rq_export->exp_replay_count);
- return copy_req;
+ atomic_inc(&req->rq_export->exp_replay_count);
}
-void ptlrpc_free_clone(struct ptlrpc_request *req)
+static void target_request_copy_put(struct ptlrpc_request *req)
{
LASSERT(list_empty(&req->rq_replay_list));
-
- ptlrpc_req_drop_rs(req);
- sptlrpc_svc_ctx_decref(req);
LASSERT(atomic_read(&req->rq_export->exp_replay_count) > 0);
atomic_dec(&req->rq_export->exp_replay_count);
class_export_rpc_put(req->rq_export);
- list_del_init(&req->rq_list);
-
- if (req->rq_user_desc) {
- int ngroups = req->rq_user_desc->pud_ngroups;
- OBD_FREE(req->rq_user_desc, sptlrpc_user_desc_size(ngroups));
- }
- OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
- OBD_FREE_PTR(req);
+ ptlrpc_server_drop_request(req);
}
static int target_exp_enqueue_req_replay(struct ptlrpc_request *req)
"failed abort_req_reply; skipping");
}
target_exp_dequeue_req_replay(req);
- ptlrpc_free_clone(req);
+ target_request_copy_put(req);
}
}
DEBUG_REQ(D_ERROR, req,
"failed abort_lock_reply; skipping");
}
- ptlrpc_free_clone(req);
+ target_request_copy_put(req);
}
}
#endif
list_for_each_entry_safe(req, n, &clean_list, rq_list) {
LASSERT(req->rq_reply_state == 0);
target_exp_dequeue_req_replay(req);
- ptlrpc_free_clone(req);
+ target_request_copy_put(req);
}
spin_lock_bh(&obd->obd_processing_task_lock);
list_for_each_entry_safe(req, n, &clean_list, rq_list){
LASSERT(req->rq_reply_state == 0);
- ptlrpc_free_clone(req);
+ target_request_copy_put(req);
}
EXIT;
int rc;
ENTRY;
- rc = lu_context_init(&req->rq_session, LCT_SESSION);
+ rc = lu_context_init(&req->rq_recov_session, LCT_SESSION);
if (rc) {
CERROR("Failure to initialize session: %d\n", rc);
- GOTO(free_clone, rc);
+ GOTO(reqcopy_put, rc);
}
/**
* export can be evicted during recovery, no need to handle replays for
* it after that, discard such request silently
*/
if (req->rq_export->exp_disconnected)
- GOTO(free_clone, rc);
+ GOTO(reqcopy_put, rc);
- req->rq_session.lc_thread = thread;
- lu_context_enter(&req->rq_session);
+ req->rq_recov_session.lc_thread = thread;
+ lu_context_enter(&req->rq_recov_session);
req->rq_svc_thread = thread;
- req->rq_svc_thread->t_env->le_ses = &req->rq_session;
+ req->rq_svc_thread->t_env->le_ses = &req->rq_recov_session;
/* thread context */
lu_context_enter(&thread->t_env->le_ctx);
(void)handler(req);
lu_context_exit(&thread->t_env->le_ctx);
- lu_context_exit(&req->rq_session);
- lu_context_fini(&req->rq_session);
+ lu_context_exit(&req->rq_recov_session);
+ lu_context_fini(&req->rq_recov_session);
/* don't reset timer for final stage */
if (!exp_finished(req->rq_export)) {
/**
obd_timeout : RECONNECT_DELAY_MAX, 1);
}
/**
- * bz18031: increase next_recovery_transno before ptlrpc_free_clone()
+ * bz18031: increase next_recovery_transno before target_request_copy_put()
* will drop exp_rpc reference
*/
if (req->rq_export->exp_req_replay_needed) {
spin_unlock_bh(&req->rq_export->exp_obd->obd_processing_task_lock);
target_exp_dequeue_req_replay(req);
}
-free_clone:
- ptlrpc_free_clone(req);
+reqcopy_put:
+ target_request_copy_put(req);
RETURN(0);
}
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) {
/* client declares he's ready to complete recovery
* so, we put the request on th final queue */
- req = ptlrpc_clone_req(req);
- if (req == NULL)
- RETURN(-ENOMEM);
+ target_request_copy_get(req);
DEBUG_REQ(D_HA, req, "queue final req");
spin_lock_bh(&obd->obd_processing_task_lock);
cfs_waitq_signal(&obd->obd_next_transno_waitq);
- if (obd->obd_recovering)
+ if (obd->obd_recovering) {
list_add_tail(&req->rq_list, &obd->obd_final_req_queue);
- else {
+ } else {
spin_unlock_bh(&obd->obd_processing_task_lock);
- ptlrpc_free_clone(req);
- if (obd->obd_stopping) {
- RETURN(-ENOTCONN);
- } else {
- RETURN(1);
- }
+ target_request_copy_put(req);
+ RETURN(obd->obd_stopping ? -ENOTCONN : 1);
}
spin_unlock_bh(&obd->obd_processing_task_lock);
RETURN(0);
}
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
/* client declares he's ready to replay locks */
- req = ptlrpc_clone_req(req);
- if (req == NULL)
- RETURN(-ENOMEM);
+ target_request_copy_get(req);
DEBUG_REQ(D_HA, req, "queue lock replay req");
spin_lock_bh(&obd->obd_processing_task_lock);
cfs_waitq_signal(&obd->obd_next_transno_waitq);
/* usually due to recovery abort */
if (!req->rq_export->exp_in_recovery) {
spin_unlock_bh(&obd->obd_processing_task_lock);
- ptlrpc_free_clone(req);
+ target_request_copy_put(req);
RETURN(-ENOTCONN);
}
LASSERT(req->rq_export->exp_lock_replay_needed);
if (OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP))
RETURN(0);
- req = ptlrpc_clone_req(req);
- if (req == NULL)
- RETURN(-ENOMEM);
-
+ target_request_copy_get(req);
spin_lock_bh(&obd->obd_processing_task_lock);
LASSERT(obd->obd_recovering);
if (!req->rq_export->exp_in_recovery) {
spin_unlock_bh(&obd->obd_processing_task_lock);
- ptlrpc_free_clone(req);
+ target_request_copy_put(req);
RETURN(-ENOTCONN);
}
LASSERT(req->rq_export->exp_req_replay_needed);
if (target_exp_enqueue_req_replay(req)) {
spin_unlock_bh(&obd->obd_processing_task_lock);
DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
- ptlrpc_free_clone(req);
+ target_request_copy_put(req);
RETURN(0);
}
"has been claimed by another client");
spin_unlock_bh(&obd->obd_processing_task_lock);
target_exp_dequeue_req_replay(req);
- ptlrpc_free_clone(req);
+ target_request_copy_put(req);
RETURN(0);
}
}
* drop a reference count of the request. if it reaches 0, we either
* put it into history list, or free it immediately.
*/
-static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
+void ptlrpc_server_drop_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
if (!atomic_dec_and_test(&req->rq_refcount))
return;
+ spin_lock(&svc->srv_at_lock);
+ list_del_init(&req->rq_timed_list);
+ if (req->rq_at_linked) {
+ struct ptlrpc_at_array *array = &svc->srv_at_array;
+ __u32 index = req->rq_at_index;
+
+ req->rq_at_linked = 0;
+ array->paa_reqs_count[index]--;
+ array->paa_count--;
+ }
+ spin_unlock(&svc->srv_at_lock);
+
+ /* finalize request */
+ if (req->rq_export) {
+ class_export_put(req->rq_export);
+ req->rq_export = NULL;
+ }
+
spin_lock(&svc->srv_lock);
svc->srv_n_active_reqs--;
*/
static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
{
- struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
-
- if (req->rq_export) {
- class_export_put(req->rq_export);
- req->rq_export = NULL;
- }
-
- if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
- DEBUG_REQ(D_INFO, req, "free req");
-
- spin_lock(&svc->srv_at_lock);
- req->rq_sent_final = 1;
- list_del_init(&req->rq_timed_list);
- if (req->rq_at_linked) {
- struct ptlrpc_at_array *array = &svc->srv_at_array;
- __u32 index = req->rq_at_index;
-
- req->rq_at_linked = 0;
- array->paa_reqs_count[index]--;
- array->paa_count--;
- }
- spin_unlock(&svc->srv_at_lock);
-
ptlrpc_server_drop_request(req);
}
return(-ENOSYS);
spin_lock(&svc->srv_at_lock);
-
- if (unlikely(req->rq_sent_final)) {
- spin_unlock(&svc->srv_at_lock);
- return 0;
- }
-
LASSERT(list_empty(&req->rq_timed_list));
index = (unsigned long)req->rq_deadline % array->paa_size;
reqcopy->rq_reqmsg = reqmsg;
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
- if (req->rq_sent_final) {
+ LASSERT(atomic_read(&req->rq_refcount));
+ /** if it is last refcount then early reply isn't needed */
+ if (atomic_read(&req->rq_refcount) == 1) {
DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
"abort sending early reply\n");
- GOTO(out, rc = 0);
+ GOTO(out, rc = -EINVAL);
}
/* Connection ref */
list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index],
rq_timed_list) {
if (rq->rq_deadline <= now + at_early_margin) {
- list_move(&rq->rq_timed_list, &work_list);
+ list_del(&rq->rq_timed_list);
+ /**
+ * ptlrpc_server_drop_request() may drop
+ * refcount to 0 already. Let's check this and
+ * don't add entry to work_list
+ */
+ if (likely(atomic_inc_not_zero(&rq->rq_refcount)))
+ list_add(&rq->rq_timed_list, &work_list);
counter++;
array->paa_reqs_count[index]--;
array->paa_count--;
at_get(&svc->srv_at_estimate), delay);
}
- /* ptlrpc_server_finish_request may delete an entry out of
- * the work list */
- spin_lock(&svc->srv_at_lock);
+ /* we took additional refcount so entries can't be deleted from list, no
+ * locking is needed */
while (!list_empty(&work_list)) {
rq = list_entry(work_list.next, struct ptlrpc_request,
rq_timed_list);
list_del_init(&rq->rq_timed_list);
- /* if the entry is still in the worklist, it hasn't been
- deleted, and is safe to take a ref to keep the req around */
- atomic_inc(&rq->rq_refcount);
- spin_unlock(&svc->srv_at_lock);
if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
ptlrpc_at_add_timed(rq);
ptlrpc_server_drop_request(rq);
- spin_lock(&svc->srv_at_lock);
}
- spin_unlock(&svc->srv_at_lock);
RETURN(0);
}