Whamcloud - gitweb
LU-170 oscc_grow_count will never grow
authorLiang Zhen <liang@whamcloud.com>
Mon, 23 May 2011 04:57:03 +0000 (12:57 +0800)
committerJohann Lombardi <johann@whamcloud.com>
Mon, 23 May 2011 06:06:05 +0000 (23:06 -0700)
We are using req::rq_async_args.space[0] to store original value of
oscc_grow_count, and using req::rq_async_args.pointer_arg[0] to store
oscc, however, ptlrpc_async_args is a union, which means
req::rq_async_args.space[0] will always be overwritten by a ossc (pointer),
and osc_interpret_create will always get true on this condition
"if (diff < (int) req->rq_async_args.space[0])" and reset
oscc_grow_count to OST_MIN_PRECREATE and set OSCC_FLAG_LOW.

Because it's very unsafe to use raw scratchpad directly, I also cleaned
up all using of raw scratchpad in this patch.

Change-Id: I431a37521a41c5dfbe10ebca9efac157985add51
Signed-off-by: Liang Zhen <liang@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/588
Tested-by: Hudson
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lockd.c
lustre/mdc/mdc_locks.c
lustre/osc/osc_create.c
lustre/ptlrpc/recov_thread.c

index 8195fbb..02ec64c 100644 (file)
@@ -182,7 +182,7 @@ struct ptlrpc_client {
 
 union ptlrpc_async_args {
         /* Scratchpad for passing args to completion interpreter. Users
-         * cast to the struct of their choosing, and LASSERT that this is
+         * cast to the struct of their choosing, and CLASSERT that this is
          * big enough.  For _tons_ of context, OBD_ALLOC a struct and store
          * a pointer to it here.  The pointer_arg ensures this struct is at
          * least big enough for that. */
index 69121ff..c460bed 100644 (file)
@@ -68,6 +68,11 @@ extern struct lustre_lock ldlm_handle_lock;
 static struct semaphore ldlm_ref_sem;
 static int ldlm_refcount;
 
+struct ldlm_cb_async_args {
+        struct ldlm_cb_set_arg *ca_set_arg;
+        struct ldlm_lock       *ca_lock;
+};
+
 static struct ldlm_state *ldlm_state;
 
 inline cfs_time_t round_timeout(cfs_time_t timeout)
@@ -617,14 +622,11 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
 
 static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
 {
-        struct ldlm_cb_set_arg *arg;
-        struct ldlm_lock *lock;
+        struct ldlm_cb_async_args *ca = data;
+        struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
+        struct ldlm_lock *lock = ca->ca_lock;
         ENTRY;
 
-        LASSERT(data != NULL);
-
-        arg = req->rq_async_args.pointer_arg[0];
-        lock = req->rq_async_args.pointer_arg[1];
         LASSERT(lock != NULL);
         if (rc != 0) {
                 rc = ldlm_handle_ast_error(lock, req, rc,
@@ -698,6 +700,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                              struct ldlm_lock_desc *desc,
                              void *data, int flag)
 {
+        struct ldlm_cb_async_args *ca;
         struct ldlm_cb_set_arg *arg = data;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
@@ -722,8 +725,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        req->rq_async_args.pointer_arg[0] = arg;
-        req->rq_async_args.pointer_arg[1] = lock;
+        CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+        ca = ptlrpc_req_async_args(req);
+        ca->ca_set_arg = arg;
+        ca->ca_lock = lock;
+
         req->rq_interpret_reply = ldlm_cb_interpret;
         req->rq_no_resend = 1;
 
@@ -796,6 +802,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         struct ldlm_cb_set_arg *arg = data;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
+        struct ldlm_cb_async_args *ca;
         long total_enqueue_wait;
         __u32 size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                         [DLM_LOCKREQ_OFF]     = sizeof(*body) };
@@ -821,8 +828,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        req->rq_async_args.pointer_arg[0] = arg;
-        req->rq_async_args.pointer_arg[1] = lock;
+        CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+        ca = ptlrpc_req_async_args(req);
+        ca->ca_set_arg = arg;
+        ca->ca_lock = lock;
+
         req->rq_interpret_reply = ldlm_cb_interpret;
         req->rq_no_resend = 1;
 
index 730fc7b..68382e8 100644 (file)
 #include <lprocfs_status.h>
 #include "mdc_internal.h"
 
+struct mdc_getattr_args {
+        struct obd_export           *ga_exp;
+        struct md_enqueue_info      *ga_minfo;
+        struct ldlm_enqueue_info    *ga_einfo;
+};
+
 int it_open_error(int phase, struct lookup_intent *it)
 {
         if (it_disposition(it, DISP_OPEN_OPEN)) {
@@ -882,11 +888,12 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
 EXPORT_SYMBOL(mdc_intent_lock);
 
 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
-                                              void *unused, int rc)
+                                              void *args, int rc)
 {
-        struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
-        struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
-        struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
+        struct mdc_getattr_args  *ga = args;
+        struct obd_export        *exp = ga->ga_exp;
+        struct md_enqueue_info   *minfo = ga->ga_minfo;
+        struct ldlm_enqueue_info *einfo = ga->ga_einfo;
         struct lookup_intent     *it;
         struct lustre_handle     *lockh;
         struct obd_device        *obddev;
@@ -930,6 +937,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
         struct mdc_op_data      *op_data = &minfo->mi_data;
         struct lookup_intent    *it = &minfo->mi_it;
         struct ptlrpc_request   *req;
+        struct mdc_getattr_args *ga;
         struct obd_device       *obddev = class_exp2obd(exp);
         struct ldlm_res_id res_id;
         ldlm_policy_data_t       policy = {
@@ -961,9 +969,12 @@ int mdc_intent_getattr_async(struct obd_export *exp,
                 RETURN(rc);
         }
 
-        req->rq_async_args.pointer_arg[0] = exp;
-        req->rq_async_args.pointer_arg[1] = minfo;
-        req->rq_async_args.pointer_arg[2] = einfo;
+        CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
+        ga = ptlrpc_req_async_args(req);
+        ga->ga_exp = exp;
+        ga->ga_minfo = minfo;
+        ga->ga_einfo = einfo;
+
         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
         ptlrpcd_add_req(req);
 
index 535e5bf..b5198d2 100644 (file)
@@ -69,6 +69,7 @@ struct osc_create_async_args {
         struct osc_creator      *rq_oscc;
         struct lov_stripe_md    *rq_lsm;
         struct obd_info         *rq_oinfo;
+        int                      rq_grow_count;
 };
 
 static int oscc_internal_create(struct osc_creator *oscc);
@@ -76,7 +77,8 @@ static int handle_async_create(struct ptlrpc_request *req, int rc);
 
 static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
 {
-        struct osc_creator *oscc;
+        struct osc_create_async_args *args = ptlrpc_req_async_args(req);
+        struct osc_creator *oscc = args->rq_oscc;
         struct ost_body *body = NULL;
         struct ptlrpc_request *fake_req, *pos;
         ENTRY;
@@ -88,7 +90,6 @@ static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
                         rc = -EPROTO;
         }
 
-        oscc = req->rq_async_args.pointer_arg[0];
         LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
 
         spin_lock(&oscc->oscc_lock);
@@ -99,13 +100,13 @@ static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
                         int diff = body->oa.o_id - oscc->oscc_last_id;
 
                         /* oscc_internal_create() stores the original value of
-                         * grow_count in rq_async_args.space[0].
+                         * grow_count in osc_create_async_args::rq_grow_count.
                          * We can't compare against oscc_grow_count directly,
                          * because it may have been increased while the RPC
                          * is in flight, so we would always find ourselves
                          * having created fewer objects and decreasing the
                          * precreate request size.  b=18577 */
-                        if (diff < (int) req->rq_async_args.space[0]) {
+                        if (diff < args->rq_grow_count) {
                                 /* the OST has not managed to create all the
                                  * objects we asked for */
                                 oscc->oscc_grow_count = max(diff,
@@ -190,6 +191,7 @@ exit_wakeup:
 
 static int oscc_internal_create(struct osc_creator *oscc)
 {
+        struct osc_create_async_args *args;
         struct ptlrpc_request *request;
         struct ost_body *body;
         __u32 size[] = { sizeof(struct ptlrpc_body), sizeof(*body) };
@@ -237,13 +239,16 @@ static int oscc_internal_create(struct osc_creator *oscc)
         request->rq_request_portal = OST_CREATE_PORTAL;
         ptlrpc_at_set_req_timeout(request);
         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        args = ptlrpc_req_async_args(request);
+        args->rq_oscc = oscc;
 
         spin_lock(&oscc->oscc_lock);
+        args->rq_grow_count = oscc->oscc_grow_count;
         body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
+        spin_unlock(&oscc->oscc_lock);
+
         body->oa.o_gr = 0;
         body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
-        request->rq_async_args.space[0] = oscc->oscc_grow_count;
-        spin_unlock(&oscc->oscc_lock);
         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
                body->oa.o_id, oscc->oscc_last_id);
 
@@ -252,7 +257,6 @@ static int oscc_internal_create(struct osc_creator *oscc)
         request->rq_no_delay = request->rq_no_resend = 1;
         ptlrpc_req_set_repsize(request, 2, size);
 
-        request->rq_async_args.pointer_arg[0] = oscc;
         request->rq_interpret_reply = osc_interpret_create;
         ptlrpcd_add_req(request);
 
index 96ee8e9..e19acdb 100644 (file)
@@ -76,6 +76,10 @@ enum {
         LLOG_LCM_FL_EXIT        = 1 << 1
 };
 
+struct llcd_async_args {
+        struct llog_canceld_ctxt *la_ctxt;
+};
+
 static void llcd_print(struct llog_canceld_ctxt *llcd,
                        const char *func, int line)
 {
@@ -185,9 +189,11 @@ llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
  * in cleanup time when all inflight rpcs aborted.
  */
 static int
-llcd_interpret(struct ptlrpc_request *req, void *noused, int rc)
+llcd_interpret(struct ptlrpc_request *req, void *args, int rc)
 {
-        struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0];
+        struct llcd_async_args *la = args;
+        struct llog_canceld_ctxt *llcd = la->la_ctxt;
+
         CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc);
         llcd_free(llcd);
         return 0;
@@ -205,6 +211,7 @@ static int llcd_send(struct llog_canceld_ctxt *llcd)
         char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
         struct obd_import *import = NULL;
         struct llog_commit_master *lcm;
+        struct llcd_async_args *la;
         struct ptlrpc_request *req;
         struct llog_ctxt *ctxt;
         int rc;
@@ -270,7 +277,10 @@ static int llcd_send(struct llog_canceld_ctxt *llcd)
         ptlrpc_req_set_repsize(req, 1, NULL);
         ptlrpc_at_set_req_timeout(req);
         req->rq_interpret_reply = llcd_interpret;
-        req->rq_async_args.pointer_arg[0] = llcd;
+
+        CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
+        la = ptlrpc_req_async_args(req);
+        la->la_ctxt = llcd;
 
         /* llog cancels will be replayed after reconnect so this will do twice
          * first from replay llog, second for resended rpc */