Whamcloud - gitweb
LU-243 async lov_sync() operation
authorLai Siyao <laisiyao@whamcloud.com>
Thu, 5 May 2011 02:59:34 +0000 (19:59 -0700)
committerOleg Drokin <green@whamcloud.com>
Fri, 6 May 2011 01:30:25 +0000 (18:30 -0700)
Port bz 17239 to 2.1: use RPC set for obd_sync().

Signed-off-by: Lai Siyao <laisiyao@whamcloud.com>
Change-Id: Ie139dd6a4c141f3309814655627112eed6e71b40
Reviewed-on: http://review.whamcloud.com/496
Tested-by: Hudson
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/obd.h
lustre/include/obd_class.h
lustre/llite/file.c
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_request.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c

index 61d0874..e671e69 100644 (file)
@@ -1344,9 +1344,9 @@ struct obd_ops {
         int (*o_punch)(struct obd_export *exp, struct obd_info *oinfo,
                        struct obd_trans_info *oti,
                        struct ptlrpc_request_set *rqset);
         int (*o_punch)(struct obd_export *exp, struct obd_info *oinfo,
                        struct obd_trans_info *oti,
                        struct ptlrpc_request_set *rqset);
-        int (*o_sync)(struct obd_export *exp, struct obdo *oa,
-                      struct lov_stripe_md *ea, obd_size start, obd_size end,
-                      void *capa);
+        int (*o_sync)(struct obd_export *exp, struct obd_info *oinfo,
+                      obd_size start, obd_size end,
+                      struct ptlrpc_request_set *set);
         int (*o_migrate)(struct lustre_handle *conn, struct lov_stripe_md *dst,
                          struct lov_stripe_md *src, obd_size start,
                          obd_size end, struct obd_trans_info *oti);
         int (*o_migrate)(struct lustre_handle *conn, struct lov_stripe_md *dst,
                          struct lov_stripe_md *src, obd_size start,
                          obd_size end, struct obd_trans_info *oti);
index 690f5c7..6427f4f 100644 (file)
@@ -1275,9 +1275,30 @@ static inline int obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static inline int obd_sync(struct obd_export *exp, struct obdo *oa,
-                           struct lov_stripe_md *ea, obd_size start,
-                           obd_size end, void *capa)
+static inline int obd_sync_rqset(struct obd_export *exp, struct obd_info *oinfo,
+                                 obd_size start, obd_size end)
+{
+        struct ptlrpc_request_set *set = NULL;
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_DT_OP(exp->exp_obd, sync, -EOPNOTSUPP);
+        EXP_COUNTER_INCREMENT(exp, sync);
+
+        set =  ptlrpc_prep_set();
+        if (set == NULL)
+                RETURN(-ENOMEM);
+
+        rc = OBP(exp->exp_obd, sync)(exp, oinfo, start, end, set);
+        if (rc == 0)
+                rc = ptlrpc_set_wait(set);
+        ptlrpc_set_destroy(set);
+        RETURN(rc);
+}
+
+static inline int obd_sync(struct obd_export *exp, struct obd_info *oinfo,
+                           obd_size start, obd_size end,
+                           struct ptlrpc_request_set *set)
 {
         int rc;
         ENTRY;
 {
         int rc;
         ENTRY;
@@ -1285,7 +1306,7 @@ static inline int obd_sync(struct obd_export *exp, struct obdo *oa,
         OBD_CHECK_DT_OP(exp->exp_obd, sync, -EOPNOTSUPP);
         EXP_COUNTER_INCREMENT(exp, sync);
 
         OBD_CHECK_DT_OP(exp->exp_obd, sync, -EOPNOTSUPP);
         EXP_COUNTER_INCREMENT(exp, sync);
 
-        rc = OBP(exp->exp_obd, sync)(exp, oa, ea, start, end, capa);
+        rc = OBP(exp->exp_obd, sync)(exp, oinfo, start, end, set);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index 8e5f240..5af347b 100644 (file)
@@ -1952,27 +1952,32 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
                 ptlrpc_req_finished(req);
 
         if (data && lsm) {
                 ptlrpc_req_finished(req);
 
         if (data && lsm) {
-                struct obdo *oa;
+                struct obd_info *oinfo;
 
 
-                OBDO_ALLOC(oa);
-                if (!oa)
+                OBD_ALLOC_PTR(oinfo);
+                if (!oinfo)
                         RETURN(rc ? rc : -ENOMEM);
                         RETURN(rc ? rc : -ENOMEM);
-
-                oa->o_id = lsm->lsm_object_id;
-                oa->o_seq = lsm->lsm_object_seq;
-                oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
-                obdo_from_inode(oa, inode, &ll_i2info(inode)->lli_fid,
+                OBDO_ALLOC(oinfo->oi_oa);
+                if (!oinfo->oi_oa) {
+                        OBD_FREE_PTR(oinfo);
+                        RETURN(rc ? rc : -ENOMEM);
+                }
+                oinfo->oi_oa->o_id = lsm->lsm_object_id;
+                oinfo->oi_oa->o_seq = lsm->lsm_object_seq;
+                oinfo->oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+                obdo_from_inode(oinfo->oi_oa, inode, &ll_i2info(inode)->lli_fid,
                                 OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
                                 OBD_MD_FLGROUP);
                                 OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
                                 OBD_MD_FLGROUP);
-
-                oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
-                err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
-                               0, OBD_OBJECT_EOF, oc);
-                capa_put(oc);
+                oinfo->oi_md = lsm;
+                oinfo->oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
+                err = obd_sync_rqset(ll_i2sbi(inode)->ll_dt_exp, oinfo, 0,
+                                     OBD_OBJECT_EOF);
+                capa_put(oinfo->oi_capa);
                 if (!rc)
                         rc = err;
                 if (!rc)
                         rc = err;
-                OBDO_FREE(oa);
+                OBDO_FREE(oinfo->oi_oa);
+                OBD_FREE_PTR(oinfo);
                 lli->lli_write_rc = err < 0 ? : 0;
         }
 
                 lli->lli_write_rc = err < 0 ? : 0;
         }
 
index 9d899fc..cefcc3f 100644 (file)
@@ -227,9 +227,8 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
                        struct lov_request_set **reqset);
 int lov_fini_punch_set(struct lov_request_set *set);
 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *obd_info,
                        struct lov_request_set **reqset);
 int lov_fini_punch_set(struct lov_request_set *set);
 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *obd_info,
-                      struct obdo *src_oa,
-                      struct lov_stripe_md *lsm, obd_off start,
-                      obd_off end, struct lov_request_set **reqset);
+                      obd_off start, obd_off end,
+                      struct lov_request_set **reqset);
 int lov_fini_sync_set(struct lov_request_set *set);
 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                          struct ldlm_enqueue_info *einfo,
 int lov_fini_sync_set(struct lov_request_set *set);
 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                          struct ldlm_enqueue_info *einfo,
index 943e577..61fdb7b 100644 (file)
@@ -1564,49 +1564,71 @@ static int lov_punch(struct obd_export *exp, struct obd_info *oinfo,
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-static int lov_sync(struct obd_export *exp, struct obdo *oa,
-                    struct lov_stripe_md *lsm, obd_off start, obd_off end,
-                    void *capa)
+static int lov_sync_interpret(struct ptlrpc_request_set *rqset,
+                              void *data, int rc)
 {
 {
-        struct lov_request_set *set;
-        struct obd_info oinfo;
+        struct lov_request_set *lovset = data;
+        int err;
+        ENTRY;
+
+        if (rc)
+                lovset->set_completes = 0;
+        err = lov_fini_sync_set(lovset);
+        RETURN(rc ?: err);
+}
+
+static int lov_sync(struct obd_export *exp, struct obd_info *oinfo,
+                    obd_off start, obd_off end,
+                    struct ptlrpc_request_set *rqset)
+{
+        struct lov_request_set *set = NULL;
         struct lov_obd *lov;
         cfs_list_t *pos;
         struct lov_request *req;
         struct lov_obd *lov;
         cfs_list_t *pos;
         struct lov_request *req;
-        int err = 0, rc = 0;
+        int rc = 0;
         ENTRY;
 
         ENTRY;
 
-        ASSERT_LSM_MAGIC(lsm);
+        ASSERT_LSM_MAGIC(oinfo->oi_md);
+        LASSERT(rqset != NULL);
 
         if (!exp->exp_obd)
                 RETURN(-ENODEV);
 
         lov = &exp->exp_obd->u.lov;
 
         if (!exp->exp_obd)
                 RETURN(-ENODEV);
 
         lov = &exp->exp_obd->u.lov;
-        rc = lov_prep_sync_set(exp, &oinfo, oa, lsm, start, end, &set);
+        rc = lov_prep_sync_set(exp, oinfo, start, end, &set);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
+        CDEBUG(D_INFO, "fsync objid "LPX64" ["LPX64", "LPX64"]\n",
+               set->set_oi->oi_oa->o_id, start, end);
+
         cfs_list_for_each (pos, &set->set_list) {
                 req = cfs_list_entry(pos, struct lov_request, rq_link);
 
         cfs_list_for_each (pos, &set->set_list) {
                 req = cfs_list_entry(pos, struct lov_request, rq_link);
 
-                rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                              req->rq_oi.oi_oa, NULL,
+                rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp, &req->rq_oi,
                               req->rq_oi.oi_policy.l_extent.start,
                               req->rq_oi.oi_policy.l_extent.start,
-                              req->rq_oi.oi_policy.l_extent.end, capa);
-                err = lov_update_common_set(set, req, rc);
-                if (err) {
+                              req->rq_oi.oi_policy.l_extent.end, rqset);
+                if (rc) {
                         CERROR("error: fsync objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
                                set->set_oi->oi_oa->o_id,
                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
                         CERROR("error: fsync objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
                                set->set_oi->oi_oa->o_id,
                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
-                        if (!rc)
-                                rc = err;
+                        break;
                 }
         }
                 }
         }
-        err = lov_fini_sync_set(set);
-        if (!rc)
-                rc = err;
-        RETURN(rc);
+
+        /* If we are not waiting for responses on async requests, return. */
+        if (rc || cfs_list_empty(&rqset->set_requests)) {
+                int err = lov_fini_sync_set(set);
+
+                RETURN(rc ?: err);
+        }
+
+        LASSERT(rqset->set_interpret == NULL);
+        rqset->set_interpret = lov_sync_interpret;
+        rqset->set_arg = (void *)set;
+
+        RETURN(0);
 }
 
 static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
 }
 
 static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
index d61101d..a8c350d 100644 (file)
@@ -1413,8 +1413,18 @@ int lov_fini_sync_set(struct lov_request_set *set)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+/* The callback for osc_sync that finilizes a request info when a
+ * response is recieved. */
+static int cb_sync_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
+}
+
 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
-                      struct obdo *src_oa, struct lov_stripe_md *lsm,
                       obd_off start, obd_off end,
                       struct lov_request_set **reqset)
 {
                       obd_off start, obd_off end,
                       struct lov_request_set **reqset)
 {
@@ -1423,18 +1433,16 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
         int rc = 0, i;
         ENTRY;
 
         int rc = 0, i;
         ENTRY;
 
-        OBD_ALLOC(set, sizeof(*set));
+        OBD_ALLOC_PTR(set);
         if (set == NULL)
                 RETURN(-ENOMEM);
         lov_init_set(set);
 
         set->set_exp = exp;
         set->set_oi = oinfo;
         if (set == NULL)
                 RETURN(-ENOMEM);
         lov_init_set(set);
 
         set->set_exp = exp;
         set->set_oi = oinfo;
-        set->set_oi->oi_md = lsm;
-        set->set_oi->oi_oa = src_oa;
 
 
-        for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
                 struct lov_request *req;
                 obd_off rs, re;
 
                 struct lov_request *req;
                 obd_off rs, re;
 
@@ -1444,10 +1452,11 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
                         continue;
                 }
 
                         continue;
                 }
 
-                if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
+                if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
+                                           &re))
                         continue;
 
                         continue;
 
-                OBD_ALLOC(req, sizeof(*req));
+                OBD_ALLOC_PTR(req);
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
                 req->rq_stripe = i;
                 if (req == NULL)
                         GOTO(out_set, rc = -ENOMEM);
                 req->rq_stripe = i;
@@ -1458,7 +1467,7 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
                         OBD_FREE(req, sizeof(*req));
                         GOTO(out_set, rc = -ENOMEM);
                 }
                         OBD_FREE(req, sizeof(*req));
                         GOTO(out_set, rc = -ENOMEM);
                 }
-                memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
+                *req->rq_oi.oi_oa = *oinfo->oi_oa;
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
@@ -1466,6 +1475,7 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
                 req->rq_oi.oi_policy.l_extent.gid = -1;
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
                 req->rq_oi.oi_policy.l_extent.gid = -1;
+                req->rq_oi.oi_cb_up = cb_sync_update;
 
                 lov_set_add_req(req, set);
         }
 
                 lov_set_add_req(req, set);
         }
index f60190d..8e3779b 100644 (file)
@@ -4240,9 +4240,9 @@ static int filter_truncate(struct obd_export *exp, struct obd_info *oinfo,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int filter_sync(struct obd_export *exp, struct obdo *oa,
-                       struct lov_stripe_md *lsm, obd_off start, obd_off end,
-                       void *capa)
+static int filter_sync(struct obd_export *exp, struct obd_info *oinfo,
+                       obd_off start, obd_off end,
+                       struct ptlrpc_request_set *set)
 {
         struct lvfs_run_ctxt saved;
         struct obd_device_target *obt;
 {
         struct lvfs_run_ctxt saved;
         struct obd_device_target *obt;
@@ -4250,22 +4250,23 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         int rc, rc2;
         ENTRY;
 
         int rc, rc2;
         ENTRY;
 
-        rc = filter_auth_capa(exp, NULL, oa->o_seq,
-                              (struct lustre_capa *)capa, CAPA_OPC_OSS_WRITE);
+        rc = filter_auth_capa(exp, NULL, oinfo->oi_oa->o_seq,
+                              (struct lustre_capa *)oinfo->oi_capa,
+                              CAPA_OPC_OSS_WRITE);
         if (rc)
                 RETURN(rc);
 
         obt = &exp->exp_obd->u.obt;
 
         /* An objid of zero is taken to mean "sync whole filesystem" */
         if (rc)
                 RETURN(rc);
 
         obt = &exp->exp_obd->u.obt;
 
         /* An objid of zero is taken to mean "sync whole filesystem" */
-        if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
+        if (!oinfo->oi_oa || !(oinfo->oi_oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(exp->exp_obd, obt->obt_sb);
                 /* Flush any remaining cancel messages out to the target */
                 filter_sync_llogs(exp->exp_obd, exp);
                 RETURN(rc);
         }
 
                 rc = fsfilt_sync(exp->exp_obd, obt->obt_sb);
                 /* Flush any remaining cancel messages out to the target */
                 filter_sync_llogs(exp->exp_obd, exp);
                 RETURN(rc);
         }
 
-        dentry = filter_oa2dentry(exp->exp_obd, &oa->o_oi);
+        dentry = filter_oa2dentry(exp->exp_obd, &oinfo->oi_oa->o_oi);
         if (IS_ERR(dentry))
                 RETURN(PTR_ERR(dentry));
 
         if (IS_ERR(dentry))
                 RETURN(PTR_ERR(dentry));
 
@@ -4287,8 +4288,9 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         }
         UNLOCK_INODE_MUTEX(dentry->d_inode);
 
         }
         UNLOCK_INODE_MUTEX(dentry->d_inode);
 
-        oa->o_valid = OBD_MD_FLID;
-        obdo_from_inode(oa, dentry->d_inode, NULL, FILTER_VALID_FLAGS);
+        oinfo->oi_oa->o_valid = OBD_MD_FLID;
+        obdo_from_inode(oinfo->oi_oa, dentry->d_inode, NULL,
+                        FILTER_VALID_FLAGS);
 
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
 
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
index 9c483c6..02c3ace 100644 (file)
@@ -564,16 +564,40 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
                               oinfo->oi_cb_up, oinfo, rqset);
 }
 
                               oinfo->oi_cb_up, oinfo, rqset);
 }
 
-static int osc_sync(struct obd_export *exp, struct obdo *oa,
-                    struct lov_stripe_md *md, obd_size start, obd_size end,
-                    void *capa)
+static int osc_sync_interpret(const struct lu_env *env,
+                              struct ptlrpc_request *req,
+                              void *arg, int rc)
+{
+        struct osc_async_args *aa = arg;
+        struct ost_body *body;
+        ENTRY;
+
+        if (rc)
+                GOTO(out, rc);
+
+        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        if (body == NULL) {
+                CERROR ("can't unpack ost_body\n");
+                GOTO(out, rc = -EPROTO);
+        }
+
+        *aa->aa_oi->oi_oa = body->oa;
+out:
+        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+        RETURN(rc);
+}
+
+static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
+                    obd_size start, obd_size end,
+                    struct ptlrpc_request_set *set)
 {
         struct ptlrpc_request *req;
         struct ost_body       *body;
 {
         struct ptlrpc_request *req;
         struct ost_body       *body;
+        struct osc_async_args *aa;
         int                    rc;
         ENTRY;
 
         int                    rc;
         ENTRY;
 
-        if (!oa) {
+        if (!oinfo->oi_oa) {
                 CDEBUG(D_INFO, "oa NULL\n");
                 RETURN(-EINVAL);
         }
                 CDEBUG(D_INFO, "oa NULL\n");
                 RETURN(-EINVAL);
         }
@@ -582,7 +606,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        osc_set_capa_size(req, &RMF_CAPA1, capa);
+        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
         if (rc) {
                 ptlrpc_request_free(req);
         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
         if (rc) {
                 ptlrpc_request_free(req);
@@ -592,28 +616,21 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
         /* overload the size and blocks fields in the oa with start/end */
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
         /* overload the size and blocks fields in the oa with start/end */
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        lustre_set_wire_obdo(&body->oa, oa);
+        lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
         body->oa.o_size = start;
         body->oa.o_blocks = end;
         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
         body->oa.o_size = start;
         body->oa.o_blocks = end;
         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
-        osc_pack_capa(req, body, capa);
+        osc_pack_capa(req, body, oinfo->oi_capa);
 
         ptlrpc_request_set_replen(req);
 
         ptlrpc_request_set_replen(req);
+        req->rq_interpret_reply = osc_sync_interpret;
 
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
-
-        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                GOTO(out, rc = -EPROTO);
-
-        lustre_get_wire_obdo(oa, &body->oa);
+        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+        aa = ptlrpc_req_async_args(req);
+        aa->aa_oi = oinfo;
 
 
-        EXIT;
- out:
-        ptlrpc_req_finished(req);
-        return rc;
+        ptlrpc_set_add_req(set, req);
+        RETURN (0);
 }
 
 /* Find and cancel locally locks matched by @mode in the resource found by
 }
 
 /* Find and cancel locally locks matched by @mode in the resource found by
index bab562a..e039671 100644 (file)
@@ -72,7 +72,7 @@ CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
 /**
  * Do not return server-side uid/gid to remote client
  */
 /**
  * Do not return server-side uid/gid to remote client
  */
-static void ost_drop_id(struct obd_export *exp, struct  obdo *oa)
+static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
 {
         if (exp_connect_rmtclient(exp)) {
                 oa->o_uid = -1;
 {
         if (exp_connect_rmtclient(exp)) {
                 oa->o_uid = -1;
@@ -249,8 +249,9 @@ static void ost_lock_put(struct obd_export *exp,
 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
 {
         struct ost_body *body, *repbody;
 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
 {
         struct ost_body *body, *repbody;
-        struct obd_info oinfo = { { { 0 } } };
+        struct obd_info *oinfo;
         struct lustre_handle lh = { 0 };
         struct lustre_handle lh = { 0 };
+        struct lustre_capa *capa = NULL;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
@@ -266,24 +267,31 @@ static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
-
         rc = ost_lock_get(exp, &body->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
         if (rc)
                 RETURN(rc);
 
         rc = ost_lock_get(exp, &body->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
         if (rc)
                 RETURN(rc);
 
-        oinfo.oi_oa = &repbody->oa;
-        if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
-                oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
-                                                       &RMF_CAPA1);
-                if (oinfo.oi_capa == NULL) {
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+                if (capa == NULL) {
                         CERROR("Missing capability for OST GETATTR");
                         RETURN (-EFAULT);
                 }
         }
 
                         CERROR("Missing capability for OST GETATTR");
                         RETURN (-EFAULT);
                 }
         }
 
-        req->rq_status = obd_getattr(exp, &oinfo);
+        OBD_ALLOC_PTR(oinfo);
+        if (!oinfo)
+                RETURN(-ENOMEM);
+        oinfo->oi_oa = &body->oa;
+        oinfo->oi_capa = capa;
+
+        req->rq_status = obd_getattr(exp, oinfo);
+
+        OBD_FREE_PTR(oinfo);
+
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        repbody->oa = body->oa;
+
         ost_lock_put(exp, &lh, LCK_PR);
 
         ost_drop_id(exp, &repbody->oa);
         ost_lock_put(exp, &lh, LCK_PR);
 
         ost_drop_id(exp, &repbody->oa);
@@ -331,8 +339,8 @@ static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
                 RETURN(rc);
 
         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
                 RETURN(rc);
 
         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        oti->oti_logcookies = &repbody->oa.o_lcookie;
+        repbody->oa = body->oa;
+        oti->oti_logcookies = &body->oa.o_lcookie;
 
         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
 
         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
@@ -342,7 +350,6 @@ static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
                      struct obd_trans_info *oti)
 {
 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
                      struct obd_trans_info *oti)
 {
-        struct obd_info oinfo = { { { 0 } } };
         struct ost_body *body, *repbody;
         int rc, flags = 0;
         struct lustre_handle lh = {0,};
         struct ost_body *body, *repbody;
         int rc, flags = 0;
         struct lustre_handle lh = {0,};
@@ -359,11 +366,7 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        oinfo.oi_oa = &body->oa;
-        oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
-        oinfo.oi_policy.l_extent.end = oinfo.oi_oa->o_blocks;
-
-        if ((oinfo.oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
+        if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
                 RETURN(-EPROTO);
 
             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
                 RETURN(-EPROTO);
 
@@ -373,34 +376,48 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
 
         /* standard truncate optimization: if file body is completely
          * destroyed, don't send data back to the server. */
 
         /* standard truncate optimization: if file body is completely
          * destroyed, don't send data back to the server. */
-        if (oinfo.oi_oa->o_size == 0)
+        if (body->oa.o_size == 0)
                 flags |= LDLM_AST_DISCARD_DATA;
 
                 flags |= LDLM_AST_DISCARD_DATA;
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        rc = ost_lock_get(exp, oinfo.oi_oa, oinfo.oi_oa->o_size,
-                          oinfo.oi_oa->o_blocks, &lh, LCK_PW, flags);
+        rc = ost_lock_get(exp, &body->oa, body->oa.o_size, body->oa.o_blocks,
+                          &lh, LCK_PW, flags);
         if (rc == 0) {
         if (rc == 0) {
-                if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
-                    oinfo.oi_oa->o_flags == OBD_FL_SRVLOCK)
+                struct obd_info *oinfo;
+                struct lustre_capa *capa = NULL;
+
+                if (body->oa.o_valid & OBD_MD_FLFLAGS &&
+                    body->oa.o_flags == OBD_FL_SRVLOCK)
                         /*
                          * If OBD_FL_SRVLOCK is the only bit set in
                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
                          * through filter_setattr() to filter_iocontrol().
                          */
                         /*
                          * If OBD_FL_SRVLOCK is the only bit set in
                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
                          * through filter_setattr() to filter_iocontrol().
                          */
-                        oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
+                        body->oa.o_valid &= ~OBD_MD_FLFLAGS;
 
 
-                if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
-                        oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
-                                                               &RMF_CAPA1);
-                        if (oinfo.oi_capa == NULL) {
+                if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+                        capa = req_capsule_client_get(&req->rq_pill,
+                                                      &RMF_CAPA1);
+                        if (capa == NULL) {
                                 CERROR("Missing capability for OST PUNCH");
                                 RETURN (-EFAULT);
                         }
                 }
                                 CERROR("Missing capability for OST PUNCH");
                                 RETURN (-EFAULT);
                         }
                 }
-                req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
+
+                OBD_ALLOC_PTR(oinfo);
+                if (!oinfo)
+                        RETURN(-ENOMEM);
+                oinfo->oi_oa = &body->oa;
+                oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
+                oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
+                oinfo->oi_capa = capa;
+
+                req->rq_status = obd_punch(exp, oinfo, oti, NULL);
+                OBD_FREE_PTR(oinfo);
                 ost_lock_put(exp, &lh, LCK_PW);
         }
                 ost_lock_put(exp, &lh, LCK_PW);
         }
-        repbody->oa = *oinfo.oi_oa;
+
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        repbody->oa = body->oa;
         ost_drop_id(exp, &repbody->oa);
         RETURN(rc);
 }
         ost_drop_id(exp, &repbody->oa);
         RETURN(rc);
 }
@@ -408,6 +425,7 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
 {
         struct ost_body *body, *repbody;
 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
 {
         struct ost_body *body, *repbody;
+        struct obd_info *oinfo;
         struct lustre_capa *capa = NULL;
         int rc;
         ENTRY;
         struct lustre_capa *capa = NULL;
         int rc;
         ENTRY;
@@ -432,10 +450,18 @@ static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
+        OBD_ALLOC_PTR(oinfo);
+        if (!oinfo)
+                RETURN(-ENOMEM);
+
+        oinfo->oi_oa = &body->oa;
+        oinfo->oi_capa = capa;
+        req->rq_status = obd_sync(exp, oinfo, body->oa.o_size,
+                                  body->oa.o_blocks, NULL);
+        OBD_FREE_PTR(oinfo);
+
         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
-                                  repbody->oa.o_blocks, capa);
+        repbody->oa = body->oa;
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
@@ -444,8 +470,9 @@ static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
                        struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
                        struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
+        struct obd_info *oinfo;
+        struct lustre_capa *capa = NULL;
         int rc;
         int rc;
-        struct obd_info oinfo = { { { 0 } } };
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         ENTRY;
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
@@ -460,19 +487,26 @@ static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
-
-        oinfo.oi_oa = &repbody->oa;
-        if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
-                oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
-                                                       &RMF_CAPA1);
-                if (oinfo.oi_capa == NULL) {
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+                if (capa == NULL) {
                         CERROR("Missing capability for OST SETATTR");
                         RETURN (-EFAULT);
                 }
         }
                         CERROR("Missing capability for OST SETATTR");
                         RETURN (-EFAULT);
                 }
         }
-        req->rq_status = obd_setattr(exp, &oinfo, oti);
+
+        OBD_ALLOC_PTR(oinfo);
+        if (!oinfo)
+                RETURN(-ENOMEM);
+        oinfo->oi_oa = &body->oa;
+        oinfo->oi_capa = capa;
+
+        req->rq_status = obd_setattr(exp, oinfo, oti);
+
+        OBD_FREE_PTR(oinfo);
+
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        repbody->oa = body->oa;
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
 }
@@ -1744,24 +1778,35 @@ int ost_blocking_ast(struct ldlm_lock *lock,
             (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
              (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
               lock->l_flags & LDLM_FL_CBPENDING))) {
             (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
              (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
               lock->l_flags & LDLM_FL_CBPENDING))) {
+                struct obd_info *oinfo;
                 struct obdo *oa;
                 int rc;
 
                 struct obdo *oa;
                 int rc;
 
+                OBD_ALLOC_PTR(oinfo);
+                if (!oinfo)
+                        RETURN(-ENOMEM);
                 OBDO_ALLOC(oa);
                 OBDO_ALLOC(oa);
+                if (!oa) {
+                        OBD_FREE_PTR(oa);
+                        RETURN(-ENOMEM);
+                }
                 oa->o_id = lock->l_resource->lr_name.name[0];
                 oa->o_seq = lock->l_resource->lr_name.name[1];
                 oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
                 oa->o_id = lock->l_resource->lr_name.name[0];
                 oa->o_seq = lock->l_resource->lr_name.name[1];
                 oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
+                oinfo->oi_oa = oa;
 
 
-                rc = obd_sync(lock->l_export, oa, NULL,
+                rc = obd_sync(lock->l_export, oinfo,
                               lock->l_policy_data.l_extent.start,
                               lock->l_policy_data.l_extent.end, NULL);
                 if (rc)
                         CERROR("Error %d syncing data on lock cancel\n", rc);
 
                 OBDO_FREE(oa);
                               lock->l_policy_data.l_extent.start,
                               lock->l_policy_data.l_extent.end, NULL);
                 if (rc)
                         CERROR("Error %d syncing data on lock cancel\n", rc);
 
                 OBDO_FREE(oa);
+                OBD_FREE_PTR(oinfo);
         }
 
         }
 
-        return ldlm_server_blocking_ast(lock, desc, data, flag);
+        rc = ldlm_server_blocking_ast(lock, desc, data, flag);
+        RETURN(rc);
 }
 
 static int ost_filter_recovery_request(struct ptlrpc_request *req,
 }
 
 static int ost_filter_recovery_request(struct ptlrpc_request *req,