From 78293f0f6b8fff9df824c53c949ef904c8e0ddc2 Mon Sep 17 00:00:00 2001 From: ZhangHongChao Date: Fri, 4 Jun 2010 23:28:13 +0200 Subject: [PATCH] b=21877 protect bitfield access to rq_flags with rq_lock i=rread i=fanyong i=johann The AT code can access some bits of rq_flags while the service thread is updating some other bits. Everyone shoud use the rq_lock for consistency. --- lustre/ChangeLog | 6 ++++++ lustre/ldlm/ldlm_lib.c | 2 ++ lustre/ptlrpc/client.c | 15 +++++++++++++++ lustre/ptlrpc/niobuf.c | 6 ++++++ lustre/ptlrpc/pack_generic.c | 10 ++++++++-- lustre/ptlrpc/service.c | 7 +++++++ 6 files changed, 44 insertions(+), 2 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index d9d18fa..590acf2 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -175,6 +175,12 @@ Severity : normal Bugzilla : 22911 Description: Don't enable extents by default for MDT. +Severity : normal +Bugzilla : 21877 +Description: Protect bitfield access to ptlrpc_request's rq_flags, since + the AT code can access it concurrently while sending early + replies. + ------------------------------------------------------------------------------- 2010-04-30 Oracle, Inc. diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 039de76..42d4db3 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -1169,7 +1169,9 @@ static void target_request_copy_get(struct ptlrpc_request *req) /* mark that request is in recovery queue, so request handler will not * drop rpc count in export, bug 19870*/ LASSERT(!req->rq_copy_queued); + spin_lock(&req->rq_lock); req->rq_copy_queued = 1; + spin_unlock(&req->rq_lock); /* increase refcount to keep request in queue */ atomic_inc(&req->rq_refcount); /* release service thread while request is queued diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 7b02fb4..b1222d8 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -1031,7 +1031,10 @@ static int after_reply(struct ptlrpc_request *req) * space for early reply) */ req->rq_replen = size_round(req->rq_nob_received); req->rq_nob_received = 0; + + spin_lock(&req->rq_lock); req->rq_resend = 1; + spin_unlock(&req->rq_lock); RETURN(0); } @@ -1173,7 +1176,10 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) rc = ptl_send_rpc(req, 0); if (rc) { DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc); + + spin_lock(&req->rq_lock); req->rq_net_err = 1; + spin_unlock(&req->rq_lock); RETURN(rc); } RETURN(0); @@ -1311,12 +1317,16 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) spin_unlock(&imp->imp_lock); + spin_lock(&req->rq_lock); req->rq_waiting = 0; + spin_unlock(&req->rq_lock); if (req->rq_timedout||req->rq_resend) { /* This is re-sending anyways, * let's mark req as resend. */ + spin_lock(&req->rq_lock); req->rq_resend = 1; + spin_unlock(&req->rq_lock); if (req->rq_bulk) { __u64 old_xid; @@ -1338,7 +1348,10 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) DEBUG_REQ(D_HA, req, "send failed (%d)", rc); force_timer_recalc = 1; + + spin_lock(&req->rq_lock); req->rq_net_err = 1; + spin_unlock(&req->rq_lock); } /* need to reset the timeout */ force_timer_recalc = 1; @@ -2209,7 +2222,9 @@ restart: /* we can have rq_timeout on dlm fake import which not support * recovery - but me need resend request on this import instead * of return error */ + spin_lock(&req->rq_lock); req->rq_resend = 1; + spin_unlock(&req->rq_lock); goto restart; } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index a908044..fddc9ef 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -507,7 +507,10 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) CDEBUG(D_HA, "muting rpc for failed imp obd %s\n", request->rq_import->imp_obd->obd_name); /* this prevents us from waiting in ptlrpc_queue_wait */ + spin_lock(&request->rq_lock); request->rq_err = 1; + spin_unlock(&request->rq_lock); + request->rq_status = -ENODEV; RETURN(-ENODEV); } @@ -537,7 +540,10 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) OBD_ALLOC(request->rq_repbuf, request->rq_replen); if (request->rq_repbuf == NULL) { /* this prevents us from looping in ptlrpc_queue_wait */ + spin_lock(&request->rq_lock); request->rq_err = 1; + spin_unlock(&request->rq_lock); + request->rq_status = -ENOMEM; GOTO(cleanup_bulk, rc = -ENOMEM); } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 08f71bc..10052fa 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -416,8 +416,11 @@ static int lustre_pack_reply_v1(struct ptlrpc_request *req, int count, LASSERT(req->rq_reply_state == NULL); - if ((flags & LPRFL_EARLY_REPLY) == 0) + if ((flags & LPRFL_EARLY_REPLY) == 0) { + spin_lock(&req->rq_lock); req->rq_packed_final = 1; + spin_unlock(&req->rq_lock); + } msg_len = lustre_msg_size_v1(count, lens); size = sizeof(struct ptlrpc_reply_state) + msg_len; @@ -457,8 +460,11 @@ static int lustre_pack_reply_v2(struct ptlrpc_request *req, int count, LASSERT(req->rq_reply_state == NULL); - if ((flags & LPRFL_EARLY_REPLY) == 0) + if ((flags & LPRFL_EARLY_REPLY) == 0) { + spin_lock(&req->rq_lock); req->rq_packed_final = 1; + spin_unlock(&req->rq_lock); + } /* use the same size of ptlrpc_body as client requested for * interoperability cases */ diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 5445bb6..343795b 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -471,7 +471,9 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req) struct ptlrpc_at_array *array = &svc->srv_at_array; __u32 index = req->rq_at_index; + spin_lock(&req->rq_lock); req->rq_at_linked = 0; + spin_unlock(&req->rq_lock); array->paa_reqs_count[index]--; array->paa_count--; } @@ -722,7 +724,10 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req) if (list_empty(&req->rq_timed_list)) list_add(&req->rq_timed_list, &array->paa_reqs_array[index]); + spin_lock(&req->rq_lock); req->rq_at_linked = 1; + spin_unlock(&req->rq_lock); + req->rq_at_index = index; array->paa_reqs_count[index]++; array->paa_count++; @@ -927,7 +932,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) counter++; array->paa_reqs_count[index]--; array->paa_count--; + spin_lock(&rq->rq_lock); rq->rq_at_linked = 0; + spin_unlock(&rq->rq_lock); continue; } -- 1.8.3.1