Whamcloud - gitweb
b=14149
authorhuanghua <huanghua>
Thu, 7 Feb 2008 08:07:16 +0000 (08:07 +0000)
committerhuanghua <huanghua>
Thu, 7 Feb 2008 08:07:16 +0000 (08:07 +0000)
i=yong.fan
i=rahul.deshmukh
i=nikita.danilov

- use req_capsule interface for client.
- add some interoperability support on server side.

81 files changed:
lustre/cmm/mdc_device.c
lustre/cmm/mdc_object.c
lustre/fid/fid_handler.c
lustre/fid/fid_internal.h
lustre/fid/fid_lib.c
lustre/fid/fid_request.c
lustre/fld/fld_handler.c
lustre/fld/fld_internal.h
lustre/fld/fld_request.c
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_cfg.h
lustre/include/lustre_dlm.h
lustre/include/lustre_mdt.h
lustre/include/lustre_net.h
lustre/include/lustre_req_layout.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/liblustre/dir.c
lustre/liblustre/file.c
lustre/liblustre/llite_lib.c
lustre/liblustre/namei.c
lustre/liblustre/rw.c
lustre/liblustre/super.c
lustre/llite/dcache.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_nfs.c
lustre/llite/namei.c
lustre/llite/remote_perm.c
lustre/llite/symlink.c
lustre/llite/xattr.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/lmv/lmv_object.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_object.c
lustre/mds/mds_lov.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_idmap.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_handler.c
lustre/obdclass/obd_mount.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_lvb.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/gss/gss_cli_upcall.c
lustre/ptlrpc/import.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/llog_client.c
lustre/ptlrpc/llog_net.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recov_thread.c
lustre/ptlrpc/wiretest.c
lustre/quota/quota_check.c
lustre/quota/quota_context.c
lustre/quota/quota_ctl.c
lustre/utils/req-layout.c
lustre/utils/wiretest.c

index 7352cd6..9ecc428 100644 (file)
@@ -136,7 +136,8 @@ static int mdc_obd_add(const struct lu_env *env,
                                          OBD_CONNECT_MDS_CAPA |
                                          OBD_CONNECT_OSS_CAPA | 
                                          OBD_CONNECT_IBITS |
-                                         OBD_CONNECT_MDS_MDS;
+                                         OBD_CONNECT_MDS_MDS |
+                                         OBD_CONNECT_FID;
                 rc = obd_connect(env, conn, mdc, &mdc->obd_uuid, ocd);
                 OBD_FREE_PTR(ocd);
                 if (rc) {
index fde11da..808f540 100644 (file)
@@ -154,7 +154,7 @@ static int mdc_req2attr_update(const struct lu_env *env,
         mci = mdc_info_get(env);
         req = mci->mci_req;
         LASSERT(req);
-        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body);
         mdc_body2attr(body, ma);
 
@@ -162,8 +162,7 @@ static int mdc_req2attr_update(const struct lu_env *env,
                 struct lustre_capa *capa;
 
                 /* create for cross-ref will fetch mds capa from remote obj */
-                capa = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
-                                      sizeof(*capa));
+                capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
                 LASSERT(capa != NULL);
                 LASSERT(ma->ma_capa != NULL);
                 *ma->ma_capa = *capa;
@@ -177,12 +176,10 @@ static int mdc_req2attr_update(const struct lu_env *env,
                 RETURN(-EPROTO);
         }
 
-        lov = lustre_swab_repbuf(req, REPLY_REC_OFF + 1,
-                                 body->eadatasize, NULL);
-        if (lov == NULL) {
-                CERROR("Can't unpack MDS EA data\n");
+        lov = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
+                                           body->eadatasize);
+        if (lov == NULL)
                 RETURN(-EPROTO);
-        }
 
         LASSERT(ma->ma_lmm != NULL);
         LASSERT(ma->ma_lmm_size >= body->eadatasize); 
@@ -198,12 +195,10 @@ static int mdc_req2attr_update(const struct lu_env *env,
                 RETURN(-EPROTO);
         }
 
-        cookie = lustre_msg_buf(req->rq_repmsg,
-                                REPLY_REC_OFF + 2, body->aclsize);
-        if (cookie == NULL) {
-                CERROR("Can't unpack unlink cookie data\n");
+        cookie = req_capsule_server_sized_get(&req->rq_pill, &RMF_ACL,
+                                              body->aclsize);
+        if (cookie == NULL)
                 RETURN(-EPROTO);
-        }
 
         LASSERT(ma->ma_cookie != NULL);
         LASSERT(ma->ma_cookie_size == body->aclsize);
@@ -561,8 +556,8 @@ static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
         rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
                           fid, &mci->mci_req);
         if (rc == 0 || rc == -EREMOTE) {
-                body = lustre_msg_buf(mci->mci_req->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*body));
+                body = req_capsule_server_get(&mci->mci_req->rq_pill,
+                                              &RMF_MDT_BODY);
                 LASSERT(body->valid & OBD_MD_FLID);
         
                 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
index 5a576d1..73a20a7 100644 (file)
@@ -330,20 +330,18 @@ static int seq_req_handle(struct ptlrpc_request *req,
         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
         LASSERT(site != NULL);
                        
-        rc = req_capsule_pack(&info->sti_pill);
+        rc = req_capsule_server_pack(info->sti_pill);
         if (rc)
                 RETURN(err_serious(rc));
 
-        opc = req_capsule_client_get(&info->sti_pill,
-                                     &RMF_SEQ_OPC);
+        opc = req_capsule_client_get(info->sti_pill, &RMF_SEQ_OPC);
         if (opc != NULL) {
-                out = req_capsule_server_get(&info->sti_pill,
-                                             &RMF_SEQ_RANGE);
+                out = req_capsule_server_get(info->sti_pill, &RMF_SEQ_RANGE);
                 if (out == NULL)
                         RETURN(err_serious(-EPROTO));
 
                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
-                        in = req_capsule_client_get(&info->sti_pill,
+                        in = req_capsule_client_get(info->sti_pill,
                                                     &RMF_SEQ_RANGE);
 
                         LASSERT(!range_is_zero(in) && range_is_sane(in));
@@ -365,22 +363,15 @@ LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD);
 static void seq_thread_info_init(struct ptlrpc_request *req,
                                  struct seq_thread_info *info)
 {
-        int i;
-
-        /* Mark rep buffer as req-layout stuff expects */
-        for (i = 0; i < ARRAY_SIZE(info->sti_rep_buf_size); i++)
-                info->sti_rep_buf_size[i] = -1;
-
+        info->sti_pill = &req->rq_pill;
         /* Init request capsule */
-        req_capsule_init(&info->sti_pill, req, RCL_SERVER,
-                         info->sti_rep_buf_size);
-
-        req_capsule_set(&info->sti_pill, &RQF_SEQ_QUERY);
+        req_capsule_init(info->sti_pill, req, RCL_SERVER);
+        req_capsule_set(info->sti_pill, &RQF_SEQ_QUERY);
 }
 
 static void seq_thread_info_fini(struct seq_thread_info *info)
 {
-        req_capsule_fini(&info->sti_pill);
+        req_capsule_fini(info->sti_pill);
 }
 
 static int seq_handle(struct ptlrpc_request *req)
@@ -407,7 +398,7 @@ static int seq_handle(struct ptlrpc_request *req)
  */
 int seq_query(struct com_thread_info *info)
 {
-        return seq_handle(info->cti_pill.rc_req);
+        return seq_handle(info->cti_pill->rc_req);
 }
 EXPORT_SYMBOL(seq_query);
 
index 73d5959..39f795e 100644 (file)
 
 #ifdef __KERNEL__
 struct seq_thread_info {
+        struct req_capsule     *sti_pill;
         struct txn_param        sti_txn;
-        struct req_capsule      sti_pill;
         struct lu_range         sti_space;
-        int                     sti_rep_buf_size[REQ_MAX_FIELD_NR];
         struct lu_buf           sti_buf;
 };
 
index 71cd00e..cf33390 100644 (file)
 #include <lustre_fid.h>
 
 /*
- * Sequence space, starts from 0x400 to have first 0x400 sequences used for
- * special purposes. This means that if we have seq-with 10000 fids, we have
- * ~10M fids reserved for special purposes (igifs, etc.).
+ * Sequence space, starts from 0x100000000ULL to have first 0x100000000ULL
+ * sequences used for special purposes. 
+ * Those fids are reserved for special purposes (igifs, etc.).
  */
 const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
-        (0x400),
+        (0x100000000ULL),
         ((__u64)~0ULL)
 };
 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
index 5c40363..3910422 100644 (file)
@@ -53,37 +53,30 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
                           struct lu_range *output, __u32 opc,
                           const char *opcname)
 {
-        int rc, size[3] = { sizeof(struct ptlrpc_body),
-                            sizeof(__u32),
-                            sizeof(struct lu_range) };
-        struct obd_export *exp = seq->lcs_exp;
+        struct obd_export     *exp = seq->lcs_exp;
         struct ptlrpc_request *req;
-        struct lu_range *out, *in;
-        struct req_capsule pill;
-        __u32 *op;
+        struct lu_range       *out, *in;
+        __u32                 *op;
+        int                    rc;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              SEQ_QUERY, 3, size, NULL);
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
+                                        LUSTRE_MDS_VERSION, SEQ_QUERY);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        req_capsule_init(&pill, req, RCL_CLIENT, NULL);
-        req_capsule_set(&pill, &RQF_SEQ_QUERY);
-
         /* Init operation code */
-        op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
+        op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
         *op = opc;
 
         /* Zero out input range, this is not recovery yet. */
-        in = req_capsule_client_get(&pill, &RMF_SEQ_RANGE);
+        in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
         if (input != NULL)
                 *in = *input;
         else
                 range_zero(in);
 
-        size[1] = sizeof(struct lu_range);
-        ptlrpc_req_set_repsize(req, 2, size);
+        ptlrpc_request_set_replen(req);
 
         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
@@ -100,7 +93,7 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
         if (rc)
                 GOTO(out_req, rc);
 
-        out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
+        out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
         *output = *out;
 
         if (!range_is_sane(output)) {
@@ -121,7 +114,6 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
 
         EXIT;
 out_req:
-        req_capsule_fini(&pill);
         ptlrpc_req_finished(req);
         return rc;
 }
index 00c9d12..b60e11a 100644 (file)
@@ -200,16 +200,16 @@ static int fld_req_handle(struct ptlrpc_request *req,
 
         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
 
-        rc = req_capsule_pack(&info->fti_pill);
+        rc = req_capsule_server_pack(info->fti_pill);
         if (rc)
                 RETURN(err_serious(rc));
 
-        opc = req_capsule_client_get(&info->fti_pill, &RMF_FLD_OPC);
+        opc = req_capsule_client_get(info->fti_pill, &RMF_FLD_OPC);
         if (opc != NULL) {
-                in = req_capsule_client_get(&info->fti_pill, &RMF_FLD_MDFLD);
+                in = req_capsule_client_get(info->fti_pill, &RMF_FLD_MDFLD);
                 if (in == NULL)
                         RETURN(err_serious(-EPROTO));
-                out = req_capsule_server_get(&info->fti_pill, &RMF_FLD_MDFLD);
+                out = req_capsule_server_get(info->fti_pill, &RMF_FLD_MDFLD);
                 if (out == NULL)
                         RETURN(err_serious(-EPROTO));
                 *out = *in;
@@ -226,24 +226,17 @@ static int fld_req_handle(struct ptlrpc_request *req,
 static void fld_thread_info_init(struct ptlrpc_request *req,
                                  struct fld_thread_info *info)
 {
-        int i;
-
         info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg);
 
-        /* Mark rep buffer as req-layout stuff expects. */
-        for (i = 0; i < ARRAY_SIZE(info->fti_rep_buf_size); i++)
-                info->fti_rep_buf_size[i] = -1;
-
+        info->fti_pill = &req->rq_pill;
         /* Init request capsule. */
-        req_capsule_init(&info->fti_pill, req, RCL_SERVER,
-                         info->fti_rep_buf_size);
-
-        req_capsule_set(&info->fti_pill, &RQF_FLD_QUERY);
+        req_capsule_init(info->fti_pill, req, RCL_SERVER);
+        req_capsule_set(info->fti_pill, &RQF_FLD_QUERY);
 }
 
 static void fld_thread_info_fini(struct fld_thread_info *info)
 {
-        req_capsule_fini(&info->fti_pill);
+        req_capsule_fini(info->fti_pill);
 }
 
 static int fld_handle(struct ptlrpc_request *req)
@@ -270,7 +263,7 @@ static int fld_handle(struct ptlrpc_request *req)
  */
 int fld_query(struct com_thread_info *info)
 {
-        return fld_handle(info->cti_pill.rc_req);
+        return fld_handle(info->cti_pill->rc_req);
 }
 EXPORT_SYMBOL(fld_query);
 
index 421b5c9..4fbd6c2 100644 (file)
@@ -78,11 +78,10 @@ extern struct lu_fld_hash fld_hash[];
 
 #ifdef __KERNEL__
 struct fld_thread_info {
-        struct req_capsule fti_pill;
-        int                fti_rep_buf_size[REQ_MAX_FIELD_NR];
-        __u64              fti_key;
-        __u64              fti_rec;
-        __u32              fti_flags;
+        struct req_capsule *fti_pill;
+        __u64               fti_key;
+        __u64               fti_rec;
+        __u32               fti_flags;
 };
 
 int fld_index_init(struct lu_server_fld *fld,
index ba57d51..8a6a186 100644 (file)
@@ -435,36 +435,26 @@ EXPORT_SYMBOL(fld_client_fini);
 static int fld_client_rpc(struct obd_export *exp,
                           struct md_fld *mf, __u32 fld_op)
 {
-        int size[3] = { sizeof(struct ptlrpc_body),
-                        sizeof(__u32),
-                        sizeof(struct md_fld) };
         struct ptlrpc_request *req;
-        struct req_capsule pill;
-        struct md_fld *pmf;
-        __u32 *op;
-        int rc;
+        struct md_fld         *pmf;
+        __u32                 *op;
+        int                    rc;
         ENTRY;
 
         LASSERT(exp != NULL);
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                              LUSTRE_MDS_VERSION,
-                              FLD_QUERY, 3, size,
-                              NULL);
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_FLD_QUERY,
+                                        LUSTRE_MDS_VERSION, FLD_QUERY);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        req_capsule_init(&pill, req, RCL_CLIENT, NULL);
-        req_capsule_set(&pill, &RQF_FLD_QUERY);
-
-        op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
+        op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
         *op = fld_op;
 
-        pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
+        pmf = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
         *pmf = *mf;
 
-        size[1] = sizeof(struct md_fld);
-        ptlrpc_req_set_repsize(req, 2, size);
+        ptlrpc_request_set_replen(req);
         req->rq_request_portal = FLD_REQUEST_PORTAL;
 
         if (fld_op != FLD_LOOKUP)
@@ -477,13 +467,12 @@ static int fld_client_rpc(struct obd_export *exp,
         if (rc)
                 GOTO(out_req, rc);
 
-        pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
+        pmf = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
         if (pmf == NULL)
                 GOTO(out_req, rc = -EFAULT);
         *mf = *pmf;
         EXIT;
 out_req:
-        req_capsule_fini(&pill);
         ptlrpc_req_finished(req);
         return rc;
 }
index 50d7de4..e472e0d 100644 (file)
@@ -376,7 +376,6 @@ struct lu_attr {
         __u64          la_valid;  /* valid bits */
 };
 
-
 /*
  * Layer in the layered object.
  */
index c877670..6b2bf7c 100644 (file)
@@ -549,6 +549,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
 #define OBD_CONNECT_LRU_RESIZE 0x02000000ULL /* Lru resize feature. */
 #define OBD_CONNECT_MDS_MDS    0x04000000ULL /* MDS-MDS connection*/
 #define OBD_CONNECT_REAL       0x08000000ULL /* real connection */
+#define OBD_CONNECT_FID        0x10000000ULL /* FID is supported by server */
 #define OBD_CONNECT_CKSUM      0x20000000ULL /* support several cksum algos */
 
 /* also update obd_connect_names[] for lprocfs_rd_connect_flags()
@@ -568,15 +569,17 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_RMT_CLIENT | \
                                 OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \
                                 OBD_CONNECT_MDS_MDS | OBD_CONNECT_CANCELSET | \
+                                OBD_CONNECT_FID | \
                                 LRU_RESIZE_CONNECT_FLAG)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                 OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
                                 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_QUOTA64 | \
                                 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET | \
+                                OBD_CONNECT_FID | \
                                 LRU_RESIZE_CONNECT_FLAG)
 #define ECHO_CONNECT_SUPPORTED (0)
-#define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION)
+#define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION | OBD_CONNECT_FID)
 
 #define MAX_QUOTA_COUNT32 (0xffffffffULL)
 
@@ -939,7 +942,7 @@ typedef enum {
         MDS_QUOTACHECK   = 47,
         MDS_QUOTACTL     = 48,
         MDS_GETXATTR     = 49,
-        MDS_SETXATTR     = 50,
+        MDS_SETXATTR     = 50, /* obsolete, now it's MDS_REINT op */
         MDS_WRITEPAGE    = 51,
         MDS_IS_SUBDIR    = 52,
         MDS_LAST_OPC
@@ -958,6 +961,9 @@ typedef enum {
         REINT_UNLINK   = 4,
         REINT_RENAME   = 5,
         REINT_OPEN     = 6,
+        REINT_SETXATTR = 7,
+//      REINT_CLOSE    = 8,
+//      REINT_WRITE    = 9,
         REINT_MAX
 } mds_reint_t, mdt_reint_t;
 
@@ -1076,6 +1082,7 @@ struct mdt_body {
         __u64          ctime;
         __u64          blocks; /* XID, in the case of MDS_READPAGE */
         __u64          ioepoch;
+        __u64          ino;    /* for 1.6 compatibility */
         __u32          fsuid;
         __u32          fsgid;
         __u32          capability;
@@ -1085,11 +1092,13 @@ struct mdt_body {
         __u32          flags; /* from vfs for pin/unpin, MDS_BFLAG for close */
         __u32          rdev;
         __u32          nlink; /* #bytes to read in the case of MDS_READPAGE */
+        __u32          generation; /* for 1.6 compatibility */
         __u32          suppgid;
         __u32          eadatasize;
         __u32          aclsize;
         __u32          max_mdsize;
-        __u32          max_cookiesize; /* also fix lustre_swab_mdt_body */
+        __u32          max_cookiesize;
+        __u32          padding_4; /* also fix lustre_swab_mdt_body */
 };
 
 struct mds_body {
@@ -1213,18 +1222,22 @@ struct mdt_rec_setattr {
         __u32           sa_fsgid;
         __u32           sa_cap;
         __u32           sa_suppgid;
-        __u32           sa_mode;
+        __u32           sa_padding_1;
         struct lu_fid   sa_fid;
         __u64           sa_valid;
+        __u32           sa_uid;
+        __u32           sa_gid;
         __u64           sa_size;
         __u64           sa_blocks;
         __u64           sa_mtime;
         __u64           sa_atime;
         __u64           sa_ctime;
-        __u32           sa_uid;
-        __u32           sa_gid;
         __u32           sa_attr_flags;
-        __u32           sa_padding; /* also fix lustre_swab_mds_rec_setattr */
+        __u32           sa_mode;
+        __u32           sa_padding_2;
+        __u32           sa_padding_3;
+        __u32           sa_padding_4;
+        __u32           sa_padding_5;
 };
 
 extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa);
@@ -1349,18 +1362,21 @@ struct mdt_rec_create {
         __u32           cr_fsuid;
         __u32           cr_fsgid;
         __u32           cr_cap;
-        __u32           cr_flags; /* for use with open */
-        __u32           cr_mode;
-        struct lustre_handle cr_old_handle; /* u64 handle in case of open replay */
+        __u32           cr_suppgid1;
+        __u32           cr_suppgid2;
         struct lu_fid   cr_fid1;
         struct lu_fid   cr_fid2;
+        struct lustre_handle cr_old_handle; /* u64 handle in case of open replay */
         __u64           cr_time;
         __u64           cr_rdev;
         __u64           cr_ioepoch;
-        __u32           cr_suppgid1;
-        __u32           cr_suppgid2;
+        __u64           cr_padding_1; /* pad for 64 bits*/
+        __u32           cr_mode;
         __u32           cr_bias;
-        __u32           cr_padding_1; /* pad for 64 bits*/
+        __u32           cr_flags;     /* for use with open */
+        __u32           cr_padding_2;
+        __u32           cr_padding_3;
+        __u32           cr_padding_4;
 };
 
 extern void lustre_swab_mdt_rec_create (struct mdt_rec_create *cr);
@@ -1393,14 +1409,18 @@ struct mdt_rec_link {
         struct lu_fid   lk_fid1;
         struct lu_fid   lk_fid2;
         __u64           lk_time;
+        __u64           lk_padding_1;
+        __u64           lk_padding_2;
+        __u64           lk_padding_3;
+        __u64           lk_padding_4;
         __u32           lk_bias;
-        __u32           lk_padding_2;  /* also fix lustre_swab_mds_rec_link */
-        __u32           lk_padding_3;  /* also fix lustre_swab_mds_rec_link */
-        __u32           lk_padding_4;  /* also fix lustre_swab_mds_rec_link */
+        __u32           lk_padding_5;
+        __u32           lk_padding_6;
+        __u32           lk_padding_7;
+        __u32           lk_padding_8;
+        __u32           lk_padding_9;
 };
 
-extern void lustre_swab_mdt_rec_link (struct mdt_rec_link *lk);
-
 struct mds_rec_unlink {
         __u32           ul_opcode;
         __u32           ul_fsuid;
@@ -1424,19 +1444,23 @@ struct mdt_rec_unlink {
         __u32           ul_fsuid;
         __u32           ul_fsgid;
         __u32           ul_cap;
-        __u32           ul_suppgid;
-        __u32           ul_mode;
+        __u32           ul_suppgid1;
+        __u32           ul_suppgid2;
         struct lu_fid   ul_fid1;
         struct lu_fid   ul_fid2;
         __u64           ul_time;
+        __u64           ul_padding_2;
+        __u64           ul_padding_3;
+        __u64           ul_padding_4;
+        __u64           ul_padding_5;
         __u32           ul_bias;
-        __u32           ul_padding_2; /* also fix lustre_swab_mds_rec_unlink */
-        __u32           ul_padding_3; /* also fix lustre_swab_mds_rec_unlink */
-        __u32           ul_padding_4; /* also fix lustre_swab_mds_rec_unlink */
+        __u32           ul_mode;
+        __u32           ul_padding_6;
+        __u32           ul_padding_7;
+        __u32           ul_padding_8;
+        __u32           ul_padding_9;
 };
 
-extern void lustre_swab_mdt_rec_unlink (struct mdt_rec_unlink *ul);
-
 struct mds_rec_rename {
         __u32           rn_opcode;
         __u32           rn_fsuid;
@@ -1465,15 +1489,66 @@ struct mdt_rec_rename {
         struct lu_fid   rn_fid1;
         struct lu_fid   rn_fid2;
         __u64           rn_time;
-        __u32           rn_mode;      /* cross-ref rename has mode */
+        __u64           rn_padding_1;
+        __u64           rn_padding_2;
+        __u64           rn_padding_3;
+        __u64           rn_padding_4;
         __u32           rn_bias;      /* some operation flags */
-        __u32           rn_padding_3; /* also fix lustre_swab_mdt_rec_rename */
-        __u32           rn_padding_4; /* also fix lustre_swab_mdt_rec_rename */
-};
-
-extern void lustre_swab_mdt_rec_rename (struct mdt_rec_rename *rn);
+        __u32           rn_mode;      /* cross-ref rename has mode */
+        __u32           rn_padding_5;
+        __u32           rn_padding_6;
+        __u32           rn_padding_7;
+        __u32           rn_padding_8;
+};
+
+struct mdt_rec_setxattr {
+        __u32           sx_opcode;
+        __u32           sx_fsuid;
+        __u32           sx_fsgid;
+        __u32           sx_cap;
+        __u32           sx_suppgid1;
+        __u32           sx_suppgid2;
+        struct lu_fid   sx_fid;
+        __u64           sx_padding_1; /* These three members are lu_fid size */
+        __u32           sx_padding_2;
+        __u32           sx_padding_3;
+        __u64           sx_valid;
+        __u64           sx_padding_4;
+        __u64           sx_padding_5;
+        __u64           sx_padding_6;
+        __u64           sx_padding_7;
+        __u32           sx_size;
+        __u32           sx_flags;
+        __u32           sx_padding_8;
+        __u32           sx_padding_9;
+        __u32           sx_padding_10;
+        __u32           sx_padding_11;
+};
+
+struct mdt_rec_reint {
+        __u32           rr_opcode;
+        __u32           rr_fsuid;
+        __u32           rr_fsgid;
+        __u32           rr_cap;
+        __u32           rr_suppgid1;
+        __u32           rr_suppgid2;
+        struct lu_fid   rr_fid1;
+        struct lu_fid   rr_fid2;
+        __u64           rr_mtime;
+        __u64           rr_atime;
+        __u64           rr_ctime;
+        __u64           rr_size;
+        __u64           rr_blocks;
+        __u32           rr_bias;
+        __u32           rr_mode;
+        __u32           rr_padding_1; /* also fix lustre_swab_mdt_rec_reint */
+        __u32           rr_padding_2; /* also fix lustre_swab_mdt_rec_reint */
+        __u32           rr_padding_3; /* also fix lustre_swab_mdt_rec_reint */
+        __u32           rr_padding_4; /* also fix lustre_swab_mdt_rec_reint */
+};
+
+extern void lustre_swab_mdt_rec_reint(struct mdt_rec_reint *rr);
 
-/* begin adding MDT by huanghua@clusterfs.com */
 struct lmv_desc {
         __u32 ld_tgt_count;                /* how many MDS's */
         __u32 ld_active_tgt_count;         /* how many active */
@@ -1481,7 +1556,6 @@ struct lmv_desc {
 };
 
 extern void lustre_swab_lmv_desc (struct lmv_desc *ld);
-/* end adding MDT by huanghua@clusterfs.com */
 
 struct md_fld {
         seqno_t mf_seq;
index 5432519..386fe3e 100644 (file)
@@ -58,7 +58,7 @@ enum lcfg_command_type {
         LCFG_LOV_ADD_INA    = 0x00ce013,
         LCFG_ADD_MDC        = 0x00cf014,
         LCFG_DEL_MDC        = 0x00cf015,
-        LCFG_SPTLRPC_CONF   = 0x00cf016,
+        LCFG_SPTLRPC_CONF   = 0x00ce016,
 };
 
 struct lustre_cfg_bufs {
index 5c59d5b..b79c0d6 100644 (file)
@@ -726,13 +726,14 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                      ldlm_policy_data_t *policy, int *flags,
                      void *lvb, __u32 lvb_len, void *lvb_swabber,
                      struct lustre_handle *lockh, int async);
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
-                                             int bufcount, int *size,
-                                             struct list_head *head, int count);
-struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
-                                         int opc, int bufcount, int *size,
-                                         int bufoff, int canceloff,
-                                         struct list_head *cancels, int count);
+int ldlm_prep_enqueue_req(struct obd_export *exp,
+                          struct ptlrpc_request *req,
+                          struct list_head *cancels,
+                          int count);
+int ldlm_prep_elc_req(struct obd_export *exp,
+                      struct ptlrpc_request *req,
+                      int version, int opc, int canceloff,
+                      struct list_head *cancels, int count);
 int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
                          const struct ldlm_request *dlm_req,
                          const struct ldlm_callback_suite *cbs);
@@ -773,7 +774,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                                ldlm_mode_t mode, int lock_flags,
                                int cancel_flags, void *opaque);
 int ldlm_cli_cancel_list(struct list_head *head, int count,
-                         struct ptlrpc_request *req, int off, int flags);
+                         struct ptlrpc_request *req, int flags);
  
 /* mds/handler.c */
 /* This has to be here because recursive inclusion sucks. */
index 06ed6b1..28d6beb 100644 (file)
@@ -37,7 +37,7 @@ struct com_thread_info {
         /*
          * for req-layout interface.
          */
-        struct req_capsule cti_pill;
+        struct req_capsule *cti_pill;
 };
 
 enum {
index ca69a55..1a9768d 100644 (file)
@@ -42,6 +42,7 @@
 #include <lustre_import.h>
 #include <lprocfs_status.h>
 #include <lu_object.h>
+#include <lustre_req_layout.h>
 
 /* MD flags we _always_ use */
 #define PTLRPC_MD_OPTIONS  0
@@ -418,6 +419,9 @@ struct ptlrpc_request {
         struct ptlrpc_request_pool *rq_pool;    /* Pool if request from
                                                    preallocated list */
         struct lu_context           rq_session;
+
+        /* request format */
+        struct req_capsule          rq_pill;
 };
 
 static inline void ptlrpc_close_replay_seq(struct ptlrpc_request *req)
@@ -771,14 +775,27 @@ void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
 struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int,
                                                 void (*populate_pool)(struct ptlrpc_request_pool *, int));
+struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
+                                            const struct req_format *format);
+struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
+                                            struct ptlrpc_request_pool *,
+                                            const struct req_format *format);
+void ptlrpc_request_free(struct ptlrpc_request *request);
+int ptlrpc_request_pack(struct ptlrpc_request *request,
+                        __u32 version, int opcode);
+struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
+                                                const struct req_format *format,
+                                                __u32 version, int opcode);
+int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
+                             __u32 version, int opcode, char **bufs,
+                             struct ptlrpc_cli_ctx *ctx);
 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
                                        int opcode, int count, int *lengths,
                                        char **bufs);
 struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp,
                                              __u32 version, int opcode,
                                             int count, int *lengths, char **bufs,
-                                            struct ptlrpc_request_pool *pool,
-                                            struct ptlrpc_cli_ctx *ctx);
+                                            struct ptlrpc_request_pool *pool);
 void ptlrpc_free_req(struct ptlrpc_request *request);
 void ptlrpc_req_finished(struct ptlrpc_request *request);
 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
@@ -912,6 +929,8 @@ void lustre_msg_set_last_committed(struct lustre_msg *msg,__u64 last_committed);
 void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno);
 void lustre_msg_set_status(struct lustre_msg *msg, __u32 status);
 void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt);
+void ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, int *sizes);
+void ptlrpc_request_set_replen(struct ptlrpc_request *req);
 
 static inline void
 lustre_shrink_reply(struct ptlrpc_request *req, int segment,
@@ -958,14 +977,6 @@ static inline int ptlrpc_req_get_repsize(struct ptlrpc_request *req)
         }
 }
 
-static inline void
-ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, int *lens)
-{
-        req->rq_replen = lustre_msg_size(req->rq_reqmsg->lm_magic, count, lens);
-        if (req->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2)
-                req->rq_reqmsg->lm_repsize = req->rq_replen;
-}
-
 /* ldlm/ldlm_lib.c */
 int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg);
 int client_obd_cleanup(struct obd_device *obddev);
index a087e27..eb5cde5 100644 (file)
@@ -41,19 +41,15 @@ enum req_location {
         RCL_NR
 };
 
+/* Maximal number of fields (buffers) in a request message. */
+#define REQ_MAX_FIELD_NR  9
+
 struct req_capsule {
         struct ptlrpc_request   *rc_req;
         const struct req_format *rc_fmt;
         __u32                    rc_swabbed;
         enum req_location        rc_loc;
-        int                     *rc_area;
-};
-
-enum {
-        /*
-         * Maximal number of fields (buffers) in a request message.
-         */
-        REQ_MAX_FIELD_NR = 8
+        int                      rc_area[RCL_NR][REQ_MAX_FIELD_NR];
 };
 
 #if !defined(__REQ_LAYOUT_USER__)
@@ -62,20 +58,34 @@ enum {
 #include <lustre_net.h>
 
 void req_capsule_init(struct req_capsule *pill, struct ptlrpc_request *req,
-                      enum req_location location, int *area);
+                      enum req_location location);
 void req_capsule_fini(struct req_capsule *pill);
 
 void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt);
-int  req_capsule_pack(struct req_capsule *pill);
+void req_capsule_init_area(struct req_capsule *pill);
+int req_capsule_filled_sizes(struct req_capsule *pill, enum req_location loc);
+int  req_capsule_server_pack(struct req_capsule *pill);
 
 void *req_capsule_client_get(struct req_capsule *pill,
                              const struct req_msg_field *field);
+void *req_capsule_client_swab_get(struct req_capsule *pill,
+                                  const struct req_msg_field *field,
+                                  void (*swabber)(void*));
+void *req_capsule_client_sized_get(struct req_capsule *pill,
+                                   const struct req_msg_field *field,
+                                   int len);
 void *req_capsule_server_get(struct req_capsule *pill,
                              const struct req_msg_field *field);
+void *req_capsule_server_sized_get(struct req_capsule *pill,
+                                   const struct req_msg_field *field,
+                                   int len);
+void *req_capsule_server_swab_get(struct req_capsule *pill,
+                                  const struct req_msg_field *field,
+                                  void *swabber);
 const void *req_capsule_other_get(struct req_capsule *pill,
                                   const struct req_msg_field *field);
 
-void req_capsule_set_size(const struct req_capsule *pill,
+void req_capsule_set_size(struct req_capsule *pill,
                           const struct req_msg_field *field,
                           enum req_location loc, int size);
 int req_capsule_get_size(const struct req_capsule *pill,
@@ -89,12 +99,10 @@ int req_capsule_has_field(const struct req_capsule *pill,
 int req_capsule_field_present(const struct req_capsule *pill,
                               const struct req_msg_field *field,
                               enum req_location loc);
-
-int req_capsule_shrink(const struct req_capsule *pill,
-                       const struct req_msg_field *field,
-                       const unsigned int newlen,
-                       const int adjust,
-                       const int move_data);
+void req_capsule_shrink(struct req_capsule *pill,
+                        const struct req_msg_field *field,
+                        unsigned int newlen,
+                        enum req_location loc);
 
 int  req_layout_init(void);
 void req_layout_fini(void);
@@ -102,16 +110,29 @@ void req_layout_fini(void);
 /* __REQ_LAYOUT_USER__ */
 #endif
 
+extern const struct req_format RQF_OBD_PING;
+extern const struct req_format RQF_SEC_CTX;
+/* MGS req_format */
+extern const struct req_format RQF_MGS_TARGET_REG;
+extern const struct req_format RQF_MGS_SET_INFO;
+/* fid/fld req_format */
 extern const struct req_format RQF_SEQ_QUERY;
 extern const struct req_format RQF_FLD_QUERY;
-extern const struct req_format RQF_MDS_GETSTATUS;
+/* MDS req_format */
+extern const struct req_format RQF_MDS_CONNECT;
+extern const struct req_format RQF_MDS_DISCONNECT;
 extern const struct req_format RQF_MDS_STATFS;
+extern const struct req_format RQF_MDS_GETSTATUS;
 extern const struct req_format RQF_MDS_SYNC;
 extern const struct req_format RQF_MDS_GETXATTR;
-extern const struct req_format RQF_MDS_SETXATTR;
 extern const struct req_format RQF_MDS_GETATTR;
+/*
+ * This is format of direct (non-intent) MDS_GETATTR_NAME request.
+ */
+extern const struct req_format RQF_MDS_GETATTR_NAME;
 extern const struct req_format RQF_MDS_CLOSE;
 extern const struct req_format RQF_MDS_PIN;
+extern const struct req_format RQF_MDS_UNPIN;
 extern const struct req_format RQF_MDS_CONNECT;
 extern const struct req_format RQF_MDS_DISCONNECT;
 extern const struct req_format RQF_MDS_SET_INFO;
@@ -119,11 +140,6 @@ extern const struct req_format RQF_MDS_READPAGE;
 extern const struct req_format RQF_MDS_WRITEPAGE;
 extern const struct req_format RQF_MDS_IS_SUBDIR;
 extern const struct req_format RQF_MDS_DONE_WRITING;
-
-/*
- * This is format of direct (non-intent) MDS_GETATTR_NAME request.
- */
-extern const struct req_format RQF_MDS_GETATTR_NAME;
 extern const struct req_format RQF_MDS_REINT;
 extern const struct req_format RQF_MDS_REINT_CREATE;
 extern const struct req_format RQF_MDS_REINT_CREATE_RMT_ACL;
@@ -134,12 +150,50 @@ extern const struct req_format RQF_MDS_REINT_UNLINK;
 extern const struct req_format RQF_MDS_REINT_LINK;
 extern const struct req_format RQF_MDS_REINT_RENAME;
 extern const struct req_format RQF_MDS_REINT_SETATTR;
+extern const struct req_format RQF_MDS_REINT_SETXATTR;
+extern const struct req_format RQF_MDS_QUOTACHECK;
+extern const struct req_format RQF_MDS_QUOTACTL;
+extern const struct req_format RQF_MDS_QUOTA_DQACQ;
+extern const struct req_format RQF_QC_CALLBACK;
+/* OST req_format */
+extern const struct req_format RQF_OST_CONNECT;
+extern const struct req_format RQF_OST_DISCONNECT;
+extern const struct req_format RQF_OST_QUOTACHECK;
+extern const struct req_format RQF_OST_QUOTACTL;
+extern const struct req_format RQF_OST_GETATTR;
+extern const struct req_format RQF_OST_SETATTR;
+extern const struct req_format RQF_OST_CREATE;
+extern const struct req_format RQF_OST_PUNCH;
+extern const struct req_format RQF_OST_SYNC;
+extern const struct req_format RQF_OST_DESTROY;
+extern const struct req_format RQF_OST_BRW;
+extern const struct req_format RQF_OST_STATFS;
+extern const struct req_format RQF_OST_SET_INFO;
+extern const struct req_format RQF_OST_GET_INFO;
+
+/* LDLM req_format */
 extern const struct req_format RQF_LDLM_ENQUEUE;
+extern const struct req_format RQF_LDLM_ENQUEUE_LVB;
+extern const struct req_format RQF_LDLM_CONVERT;
 extern const struct req_format RQF_LDLM_INTENT;
 extern const struct req_format RQF_LDLM_INTENT_GETATTR;
 extern const struct req_format RQF_LDLM_INTENT_OPEN;
 extern const struct req_format RQF_LDLM_INTENT_CREATE;
 extern const struct req_format RQF_LDLM_INTENT_UNLINK;
+extern const struct req_format RQF_LDLM_CANCEL;
+extern const struct req_format RQF_LDLM_CALLBACK;
+extern const struct req_format RQF_LDLM_CP_CALLBACK;
+extern const struct req_format RQF_LDLM_BL_CALLBACK;
+extern const struct req_format RQF_LDLM_GL_CALLBACK;
+/* LOG req_format */
+extern const struct req_format RQF_LOG_CANCEL;
+extern const struct req_format RQF_LLOG_CATINFO;
+extern const struct req_format RQF_LLOG_ORIGIN_HANDLE_CREATE;
+extern const struct req_format RQF_LLOG_ORIGIN_HANDLE_DESTROY;
+extern const struct req_format RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK;
+extern const struct req_format RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK;
+extern const struct req_format RQF_LLOG_ORIGIN_HANDLE_READ_HEADER;
+extern const struct req_format RQF_LLOG_ORIGIN_CONNECT;
 
 extern const struct req_msg_field RMF_PTLRPC_BODY;
 extern const struct req_msg_field RMF_MDT_BODY;
@@ -158,19 +212,20 @@ extern const struct req_msg_field RMF_CONN;
 extern const struct req_msg_field RMF_CONNECT_DATA;
 extern const struct req_msg_field RMF_DLM_REQ;
 extern const struct req_msg_field RMF_DLM_REP;
+extern const struct req_msg_field RMF_DLM_LVB;
 extern const struct req_msg_field RMF_LDLM_INTENT;
 extern const struct req_msg_field RMF_MDT_MD;
-extern const struct req_msg_field RMF_REC_CREATE;
-extern const struct req_msg_field RMF_REC_LINK;
-extern const struct req_msg_field RMF_REC_UNLINK;
-extern const struct req_msg_field RMF_REC_RENAME;
-extern const struct req_msg_field RMF_REC_SETATTR;
+extern const struct req_msg_field RMF_REC_REINT;
+extern const struct req_msg_field RMF_REC_JOINFILE;
 extern const struct req_msg_field RMF_EADATA;
 extern const struct req_msg_field RMF_ACL;
 extern const struct req_msg_field RMF_LOGCOOKIES;
-extern const struct req_msg_field RMF_REINT_OPC;
 extern const struct req_msg_field RMF_CAPA1;
 extern const struct req_msg_field RMF_CAPA2;
+extern const struct req_msg_field RMF_OBD_QUOTACHECK;
+extern const struct req_msg_field RMF_OBD_QUOTACTL;
+extern const struct req_msg_field RMF_QUNIT_DATA;
+extern const struct req_msg_field RMF_STRING;
 
 /* seq-mgr fields */
 extern const struct req_msg_field RMF_SEQ_OPC;
@@ -180,4 +235,16 @@ extern const struct req_msg_field RMF_SEQ_RANGE;
 extern const struct req_msg_field RMF_FLD_OPC;
 extern const struct req_msg_field RMF_FLD_MDFLD;
 
+extern const struct req_msg_field RMF_LLOGD_BODY;
+extern const struct req_msg_field RMF_LLOG_LOG_HDR;
+extern const struct req_msg_field RMF_LLOGD_CONN_BODY;
+
+extern const struct req_msg_field RMF_MGS_TARGET_INFO;
+extern const struct req_msg_field RMF_MGS_SEND_PARAM;
+
+extern const struct req_msg_field RMF_OST_BODY;
+extern const struct req_msg_field RMF_OBD_IOOBJ;
+extern const struct req_msg_field RMF_OBD_ID;
+extern const struct req_msg_field RMF_NIOBUF_REMOTE;
+
 #endif /* _LUSTRE_REQ_LAYOUT_H__ */
index 7ef0ee7..b325680 100644 (file)
@@ -1324,7 +1324,7 @@ struct md_ops {
         int (*m_init_ea_size)(struct obd_export *, int, int, int);
 
         int (*m_get_lustre_md)(struct obd_export *, struct ptlrpc_request *,
-                               int, struct obd_export *, struct obd_export *,
+                               struct obd_export *, struct obd_export *,
                                struct lustre_md *);
 
         int (*m_free_lustre_md)(struct obd_export *, struct lustre_md *);
index 8915c2a..f3f9c4d 100644 (file)
@@ -1869,15 +1869,14 @@ static inline int md_unlink(struct obd_export *exp, struct md_op_data *op_data,
 
 static inline int md_get_lustre_md(struct obd_export *exp,
                                    struct ptlrpc_request *req,
-                                   int offset, struct obd_export *dt_exp,
+                                   struct obd_export *dt_exp,
                                    struct obd_export *md_exp,
                                    struct lustre_md *md)
 {
         ENTRY;
         EXP_CHECK_MD_OP(exp, get_lustre_md);
         EXP_MD_COUNTER_INCREMENT(exp, get_lustre_md);
-        RETURN(MDP(exp->exp_obd, get_lustre_md)(exp, req, offset,
-                                                dt_exp, md_exp, md));
+        RETURN(MDP(exp->exp_obd, get_lustre_md)(exp, req, dt_exp, md_exp, md));
 }
 
 static inline int md_free_lustre_md(struct obd_export *exp,
index 197791e..923401a 100644 (file)
@@ -575,26 +575,25 @@ int target_handle_connect(struct ptlrpc_request *req)
         struct obd_export *export = NULL;
         struct obd_import *revimp;
         struct lustre_handle conn;
+        struct lustre_handle *tmp;
         struct obd_uuid tgtuuid;
         struct obd_uuid cluuid;
         struct obd_uuid remote_uuid;
-        char *str, *tmp;
+        char *str;
         int rc = 0;
         int initial_conn = 0;
-        struct obd_connect_data *data;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*data) };
+        struct obd_connect_data *data, *tmpdata;
         ENTRY;
 
         OBD_RACE(OBD_FAIL_TGT_CONN_RACE);
 
-        lustre_set_req_swabbed(req, REQ_REC_OFF);
-        str = lustre_msg_string(req->rq_reqmsg, REQ_REC_OFF, sizeof(tgtuuid)-1);
+        str = req_capsule_client_get(&req->rq_pill, &RMF_TGTUUID);
         if (str == NULL) {
                 DEBUG_REQ(D_ERROR, req, "bad target UUID for connect");
                 GOTO(out, rc = -EINVAL);
         }
 
-        obd_str2uuid (&tgtuuid, str);
+        obd_str2uuid(&tgtuuid, str);
         target = class_uuid2obd(&tgtuuid);
         if (!target)
                 target = class_name2obd(str);
@@ -620,15 +619,14 @@ int target_handle_connect(struct ptlrpc_request *req)
            Really, class_uuid2obd should take the ref. */
         targref = class_incref(target);
 
-        lustre_set_req_swabbed(req, REQ_REC_OFF + 1);
-        str = lustre_msg_string(req->rq_reqmsg, REQ_REC_OFF + 1,
-                                sizeof(cluuid) - 1);
+
+        str = req_capsule_client_get(&req->rq_pill, &RMF_CLUUID);
         if (str == NULL) {
                 DEBUG_REQ(D_ERROR, req, "bad client UUID for connect");
                 GOTO(out, rc = -EINVAL);
         }
 
-        obd_str2uuid (&cluuid, str);
+        obd_str2uuid(&cluuid, str);
 
         /* XXX extract a nettype and format accordingly */
         switch (sizeof(lnet_nid_t)) {
@@ -645,19 +643,17 @@ int target_handle_connect(struct ptlrpc_request *req)
                 LBUG();
         }
 
-        tmp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn);
+        tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN);
         if (tmp == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        memcpy(&conn, tmp, sizeof conn);
-
-        data = lustre_swab_reqbuf(req, REQ_REC_OFF + 3, sizeof(*data),
-                                  lustre_swab_connect);
+        conn = *tmp;
 
+        data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
         if (!data)
                 GOTO(out, rc = -EPROTO);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 GOTO(out, rc);
 
@@ -678,10 +674,10 @@ int target_handle_connect(struct ptlrpc_request *req)
                                   OBD_OCD_VERSION_MINOR(data->ocd_version),
                                   OBD_OCD_VERSION_PATCH(data->ocd_version),
                                   OBD_OCD_VERSION_FIX(data->ocd_version));
-                        data = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                              offsetof(typeof(*data),
-                                                       ocd_version) +
-                                              sizeof(data->ocd_version));
+                        data = req_capsule_server_sized_get(&req->rq_pill,
+                                                        &RMF_CONNECT_DATA,
+                                    offsetof(typeof(*data), ocd_version) +
+                                             sizeof(data->ocd_version));
                         if (data) {
                                 data->ocd_connect_flags = OBD_CONNECT_VERSION;
                                 data->ocd_version = LUSTRE_VERSION_CODE;
@@ -789,13 +785,16 @@ int target_handle_connect(struct ptlrpc_request *req)
 
         if (export == NULL) {
                 if (target->obd_recovering) {
+                        cfs_time_t t;
+
+                        t = cfs_timer_deadline(&target->obd_recovery_timer);
+                        t = cfs_time_sub(t, cfs_time_current());
                         CERROR("%s: denying connection for new client %s (%s): "
                                "%d clients in recovery for %lds\n",
                                target->obd_name,
                                libcfs_nid2str(req->rq_peer.nid), cluuid.uuid,
                                target->obd_recoverable_clients,
-                               cfs_duration_sec(cfs_time_sub(cfs_timer_deadline(&target->obd_recovery_timer),
-                                                             cfs_time_current())));
+                               cfs_duration_sec(t));
                         rc = -EBUSY;
                 } else {
 dont_check_exports:
@@ -810,11 +809,13 @@ dont_check_exports:
                 GOTO(out, rc);
         /* Return only the parts of obd_connect_data that we understand, so the
          * client knows that we don't understand the rest. */
-        if (data)
-                memcpy(lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*data)),
-                       data, sizeof(*data));
-
+        if (data) {
+                 tmpdata = req_capsule_server_get(&req->rq_pill,
+                                                  &RMF_CONNECT_DATA);
+                  //data->ocd_connect_flags &= OBD_CONNECT_SUPPORTED;
+                 *tmpdata = *data;
+        }
+                  
         /* If all else goes well, this is our RPC return code. */
         req->rq_status = 0;
 
@@ -901,9 +902,8 @@ dont_check_exports:
                         wake_up(&target->obd_next_transno_waitq);
         }
         spin_unlock_bh(&target->obd_processing_task_lock);
-        memcpy(&conn,
-               lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn),
-               sizeof conn);
+        tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN);
+        conn = *tmp;
 
         if (export->exp_imp_reverse != NULL) {
                 /* destroyed import can be still referenced in ctxt */
@@ -973,7 +973,7 @@ int target_handle_disconnect(struct ptlrpc_request *req)
         int rc;
         ENTRY;
 
-        rc = lustre_pack_reply(req, 1, NULL, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
@@ -1951,7 +1951,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 int target_handle_ping(struct ptlrpc_request *req)
 {
         obd_ping(req->rq_export);
-        return lustre_pack_reply(req, 1, NULL, NULL);
+        return req_capsule_server_pack(&req->rq_pill);
 }
 
 void target_committed_to_req(struct ptlrpc_request *req)
@@ -1983,12 +1983,9 @@ int target_handle_qc_callback(struct ptlrpc_request *req)
         struct obd_quotactl *oqctl;
         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
 
-        oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
-                                   lustre_swab_obd_quotactl);
-        if (oqctl == NULL) {
-                CERROR("Can't unpack obd_quotactl\n");
+        oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
+        if (oqctl == NULL)
                 RETURN(-EPROTO);
-        }
 
         cli->cl_qchk_stat = oqctl->qc_stat;
 
@@ -2005,13 +2002,13 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
         void* rep;
         struct qunit_data_old *qdata_old;
         int rc = 0;
-        int repsize[2] = { sizeof(struct ptlrpc_body),
-                           sizeof(struct qunit_data) };
         ENTRY;
 
-        rc = lustre_pack_reply(req, 2, repsize, NULL);
-        if (rc)
+        rc = req_capsule_server_pack(&req->rq_pill);
+        if (rc) {
+                CERROR("packing reply failed!: rc = %d\n", rc);
                 RETURN(rc);
+        }
 
         LASSERT(req->rq_export);
 
@@ -2019,25 +2016,24 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_QUOTA64) &&
             !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) {
                 CDEBUG(D_QUOTA, "qd_count is 64bit!\n");
-                rep = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                     sizeof(struct qunit_data));
+                rep = req_capsule_server_get(&req->rq_pill,
+                                             &RMF_QUNIT_DATA);
                 LASSERT(rep);
-                qdata = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*qdata),
-                                           lustre_swab_qdata);
+                qdata = req_capsule_client_swab_get(&req->rq_pill,
+                                                    &RMF_QUNIT_DATA,
+                                          (void*)lustre_swab_qdata);
         } else {
                 CDEBUG(D_QUOTA, "qd_count is 32bit!\n");
-                rep = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                     sizeof(struct qunit_data_old));
+                rep = req_capsule_server_get(&req->rq_pill, &RMF_QUNIT_DATA);
                 LASSERT(rep);
-                qdata_old = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*qdata_old),
-                                               lustre_swab_qdata_old);
+                qdata_old = req_capsule_client_swab_get(&req->rq_pill,
+                                                        &RMF_QUNIT_DATA,
+                                           (void*)lustre_swab_qdata_old);
                 qdata = lustre_quota_old_to_new(qdata_old);
         }
 
-        if (qdata == NULL) {
-                CERROR("Can't unpack qunit_data\n");
+        if (qdata == NULL)
                 RETURN(-EPROTO);
-        }
 
         /* we use the observer */
         LASSERT(obd->obd_observer && obd->obd_observer->obd_observer);
@@ -2054,10 +2050,10 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
         /* the qd_count might be changed in lqc_handler */
         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_QUOTA64) &&
             !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) {
-                memcpy(rep,qdata,sizeof(*qdata));
+                memcpy(rep, qdata, sizeof(*qdata));
         } else {
                 qdata_old = lustre_quota_new_to_old(qdata);
-                memcpy(rep,qdata_old,sizeof(*qdata_old));
+                memcpy(rep, qdata_old, sizeof(*qdata_old));
         }
         req->rq_status = rc;
         rc = ptlrpc_reply(req);
index 23cb151..8ef4562 100644 (file)
@@ -583,17 +583,15 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                              void *data, int flag)
 {
         struct ldlm_cb_set_arg *arg = data;
-        struct ldlm_request *body;
-        struct ptlrpc_request *req;
-        int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                       [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int instant_cancel = 0, rc;
+        struct ldlm_request    *body;
+        struct ptlrpc_request  *req;
+        int                     instant_cancel = 0;
+        int                     rc = 0;
         ENTRY;
 
-        if (flag == LDLM_CB_CANCELING) {
+        if (flag == LDLM_CB_CANCELING)
                 /* Don't need to do anything here. */
                 RETURN(0);
-        }
 
         LASSERT(lock);
         LASSERT(data != NULL);
@@ -602,9 +600,9 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 ldlm_lock_dump(D_ERROR, lock, 0);
         }
 
-        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
-                              LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size,
-                              NULL);
+        req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
+                                        &RQF_LDLM_BL_CALLBACK,
+                                        LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -633,14 +631,14 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
                 instant_cancel = 1;
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         body->lock_handle[0] = lock->l_remote_handle;
         body->lock_desc = *desc;
         body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
 
         LDLM_DEBUG(lock, "server preparing blocking AST");
 
-        ptlrpc_req_set_repsize(req, 1, NULL);
+        ptlrpc_request_set_replen(req);
         if (instant_cancel) {
                 unlock_res(lock->l_resource);
                 ldlm_lock_cancel(lock);
@@ -665,13 +663,12 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
         struct ldlm_cb_set_arg *arg = data;
-        struct ldlm_request *body;
-        struct ptlrpc_request *req;
-        struct timeval granted_time;
-        long total_enqueue_wait;
-        int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int rc, buffers = 2, instant_cancel = 0;
+        struct ldlm_request    *body;
+        struct ptlrpc_request  *req;
+        struct timeval          granted_time;
+        long                    total_enqueue_wait;
+        int                     instant_cancel = 0;
+        int                     rc = 0;
         ENTRY;
 
         LASSERT(lock != NULL);
@@ -685,34 +682,35 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 LDLM_ERROR(lock, "enqueue wait took %luus from %lu",
                            total_enqueue_wait, lock->l_enqueued_time.tv_sec);
 
+         req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
+                                    &RQF_LDLM_CP_CALLBACK);
+         if (req == NULL)
+                 RETURN(-ENOMEM);
+
         lock_res_and_lock(lock);
-        if (lock->l_resource->lr_lvb_len) {
-                size[DLM_REQ_REC_OFF] = lock->l_resource->lr_lvb_len;
-                buffers = 3;
-        }
+        if (lock->l_resource->lr_lvb_len)
+                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
+                                      lock->l_resource->lr_lvb_len);
         unlock_res_and_lock(lock);
 
-        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
-                              LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, buffers,
-                              size, NULL);
-        if (req == NULL)
-                RETURN(-ENOMEM);
+        rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
+        if (rc) {
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
         req->rq_async_args.pointer_arg[0] = arg;
         req->rq_async_args.pointer_arg[1] = lock;
         req->rq_interpret_reply = ldlm_cb_interpret;
         req->rq_no_resend = 1;
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
         body->lock_handle[0] = lock->l_remote_handle;
         body->lock_flags = flags;
         ldlm_lock2desc(lock, &body->lock_desc);
+        if (lock->l_resource->lr_lvb_len) {
+                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
 
-        if (buffers == 3) {
-                void *lvb;
-
-                lvb = lustre_msg_buf(req->rq_reqmsg, DLM_REQ_REC_OFF,
-                                     lock->l_resource->lr_lvb_len);
                 lock_res_and_lock(lock);
                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                        lock->l_resource->lr_lvb_len);
@@ -722,8 +720,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
                    total_enqueue_wait);
 
-        ptlrpc_req_set_repsize(req, 1, NULL);
-
+        ptlrpc_request_set_replen(req);
         req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout);
 
@@ -761,31 +758,32 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
-        struct ldlm_resource *res = lock->l_resource;
-        struct ldlm_request *body;
+        struct ldlm_resource  *res = lock->l_resource;
+        struct ldlm_request   *body;
         struct ptlrpc_request *req;
-        int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                       [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int rc = 0;
+        int                    rc;
         ENTRY;
 
         LASSERT(lock != NULL);
 
-        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
-                              LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK, 2, size,
-                              NULL);
+        req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
+                                        &RQF_LDLM_GL_CALLBACK,
+                                        LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK);
+
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         body->lock_handle[0] = lock->l_remote_handle;
         ldlm_lock2desc(lock, &body->lock_desc);
 
         lock_res_and_lock(lock);
-        size[REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
+        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                             lock->l_resource->lr_lvb_len);
         unlock_res_and_lock(lock);
         res = lock->l_resource;
-        ptlrpc_req_set_repsize(req, 2, size);
+        ptlrpc_request_set_replen(req);
+
 
         req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout);
@@ -832,7 +830,7 @@ extern unsigned long long lu_time_stamp_get(void);
 #define lu_time_stamp_get() time(NULL)
 #endif
 
-static void ldlm_svc_get_eopc(struct ldlm_request *dlm_req,
+static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
                        struct lprocfs_stats *srv_stats)
 {
         int lock_type = 0, op = 0;
@@ -876,13 +874,11 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                          const struct ldlm_callback_suite *cbs)
 {
         struct ldlm_reply *dlm_rep;
-        int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };
-        int rc = 0;
         __u32 flags;
         ldlm_error_t err = ELDLM_OK;
         struct ldlm_lock *lock = NULL;
         void *cookie = NULL;
+        int rc = 0;
         ENTRY;
 
         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
@@ -991,19 +987,18 @@ existing_lock:
                  * local_lock_enqueue by the policy function. */
                 cookie = req;
         } else {
-                int buffers = 2;
-
                 lock_res_and_lock(lock);
                 if (lock->l_resource->lr_lvb_len) {
-                        size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
-                        buffers = 3;
+                        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+                                             RCL_SERVER,
+                                             lock->l_resource->lr_lvb_len);
                 }
                 unlock_res_and_lock(lock);
 
                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                         GOTO(out, rc = -ENOMEM);
 
-                rc = lustre_pack_reply(req, buffers, size, NULL);
+                rc = req_capsule_server_pack(&req->rq_pill);
                 if (rc)
                         GOTO(out, rc);
         }
@@ -1017,8 +1012,7 @@ existing_lock:
         if (err)
                 GOTO(out, err);
 
-        dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
-                                 sizeof(*dlm_rep));
+        dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
         dlm_rep->lock_flags = flags;
 
         ldlm_lock2desc(lock, &dlm_rep->lock_desc);
@@ -1071,9 +1065,9 @@ existing_lock:
                         LDLM_ERROR(lock, "sync lock");
                         if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
                                 struct ldlm_intent *it;
-                                it = lustre_msg_buf(req->rq_reqmsg,
-                                                    DLM_INTENT_IT_OFF,
-                                                    sizeof(*it));
+
+                                it = req_capsule_client_get(&req->rq_pill,
+                                                            &RMF_LDLM_INTENT);
                                 if (it != NULL) {
                                         CERROR("This is intent %s ("LPU64")\n",
                                                ldlm_it2str(it->opc), it->opc);
@@ -1102,16 +1096,16 @@ existing_lock:
 
                 lock_res_and_lock(lock);
                 if (rc == 0) {
-                        size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
-                        if (size[DLM_REPLY_REC_OFF] > 0) {
-                                void *lvb = lustre_msg_buf(req->rq_repmsg,
-                                                       DLM_REPLY_REC_OFF,
-                                                       size[DLM_REPLY_REC_OFF]);
+                        if (lock->l_resource->lr_lvb_len > 0) {
+                                void *lvb;
+
+                                lvb = req_capsule_server_get(&req->rq_pill,
+                                                             &RMF_DLM_LVB);
                                 LASSERTF(lvb != NULL, "req %p, lock %p\n",
                                          req, lock);
 
                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
-                                       size[DLM_REPLY_REC_OFF]);
+                                       lock->l_resource->lr_lvb_len);
                         }
                 } else {
                         ldlm_resource_unlink_lock(lock);
@@ -1136,21 +1130,19 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                         ldlm_blocking_callback blocking_callback,
                         ldlm_glimpse_callback glimpse_callback)
 {
-        int rc;
         struct ldlm_request *dlm_req;
         struct ldlm_callback_suite cbs = {
                 .lcs_completion = completion_callback,
                 .lcs_blocking   = blocking_callback,
                 .lcs_glimpse    = glimpse_callback
         };
+        int rc;
 
-        dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
-                                     sizeof *dlm_req, lustre_swab_ldlm_request);
+        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         if (dlm_req != NULL) {
                 rc = ldlm_handle_enqueue0(req->rq_export->exp_obd->obd_namespace,
                                           req, dlm_req, &cbs);
         } else {
-                CERROR ("Can't unpack dlm_req\n");
                 rc = -EFAULT;
         }
         return rc;
@@ -1162,20 +1154,17 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
         struct ldlm_reply *dlm_rep;
         struct ldlm_lock *lock;
         int rc;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };
         ENTRY;
 
         if (req->rq_export && req->rq_export->exp_ldlm_stats)
                 lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,
                                      LDLM_CONVERT - LDLM_FIRST_OPC);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
-        dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
-                                 sizeof(*dlm_rep));
+        dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
         dlm_rep->lock_flags = dlm_req->lock_flags;
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
@@ -1214,8 +1203,7 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
         int rc;
         struct ldlm_request *dlm_req;
 
-        dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof *dlm_req,
-                                     lustre_swab_ldlm_request);
+        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         if (dlm_req != NULL) {
                 rc = ldlm_handle_convert0(req, dlm_req);
         } else {
@@ -1286,10 +1274,9 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         int rc;
         ENTRY;
 
-        dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
-                                     lustre_swab_ldlm_request);
+        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         if (dlm_req == NULL) {
-                CERROR("bad request buffer for cancel\n");
+                CDEBUG(D_INFO, "bad request buffer for cancel\n");
                 RETURN(-EFAULT);
         }
 
@@ -1297,7 +1284,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                 lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,
                                      LDLM_CANCEL - LDLM_FIRST_OPC);
 
-        rc = lustre_pack_reply(req, 1, NULL, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
@@ -1385,13 +1372,14 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         }
 
         if (lock->l_lvb_len) {
-                void *lvb;
-                lvb = lustre_swab_reqbuf(req, DLM_REQ_REC_OFF, lock->l_lvb_len,
-                                         lock->l_lvb_swabber);
-                if (lvb == NULL) {
+                if (req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
+                                         RCL_CLIENT) < lock->l_lvb_len) {
                         LDLM_ERROR(lock, "completion AST did not contain "
                                    "expected LVB!");
                 } else {
+                        void *lvb = req_capsule_client_swab_get(&req->rq_pill,
+                                                                &RMF_DLM_LVB,
+                                                  (void *)lock->l_lvb_swabber);
                         memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
                 }
         }
@@ -1522,6 +1510,7 @@ int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
 #endif
 }
 
+/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
 static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
         struct ldlm_namespace *ns;
@@ -1535,6 +1524,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
          * incoming request message body, but I am responsible for the
          * message buffers. */
 
+        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+
         if (req->rq_export == NULL) {
                 struct ldlm_request *dlm_req;
 
@@ -1545,9 +1536,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                        libcfs_id2str(req->rq_peer),
                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
 
-                dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
-                                             sizeof(*dlm_req),
-                                             lustre_swab_ldlm_request);
+                req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+                dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
                 if (dlm_req != NULL)
                         CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
                                dlm_req->lock_handle[0].cookie);
@@ -1573,12 +1563,14 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                         RETURN(0);
                 break;
         case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
+                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
                         RETURN(0);
                 rc = llog_origin_handle_cancel(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
         case OBD_QC_CALLBACK:
+                req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
                         RETURN(0);
                 rc = target_handle_qc_callback(req);
@@ -1587,21 +1579,27 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         case QUOTA_DQACQ:
         case QUOTA_DQREL:
                 /* reply in handler */
+                req_capsule_set(&req->rq_pill, &RQF_MDS_QUOTA_DQACQ);
                 rc = target_handle_dqacq_callback(req);
                 RETURN(0);
         case LLOG_ORIGIN_HANDLE_CREATE:
+                req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
                         RETURN(0);
                 rc = llog_origin_handle_create(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+                req_capsule_set(&req->rq_pill,
+                                &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
                         RETURN(0);
                 rc = llog_origin_handle_next_block(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
         case LLOG_ORIGIN_HANDLE_READ_HEADER:
+                req_capsule_set(&req->rq_pill,
+                                &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
                         RETURN(0);
                 rc = llog_origin_handle_read_header(req);
@@ -1626,12 +1624,12 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         ns = req->rq_export->exp_obd->obd_namespace;
         LASSERT(ns != NULL);
 
-        dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
-                                     lustre_swab_ldlm_request);
+        req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+
+        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         if (dlm_req == NULL) {
-                CERROR ("can't unpack dlm_req\n");
                 ldlm_callback_reply(req, -EPROTO);
-                RETURN (0);
+                RETURN(0);
         }
 
         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]);
@@ -1675,6 +1673,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
+                req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
                         ldlm_callback_reply(req, 0);
                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
@@ -1682,11 +1681,13 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
+                req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
                 ldlm_callback_reply(req, 0);
                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
         case LDLM_GL_CALLBACK:
                 CDEBUG(D_INODE, "glimpse ast\n");
+                req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
                 break;
         default:
@@ -1706,6 +1707,8 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
          * incoming request message body, but I am responsible for the
          * message buffers. */
 
+        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+
         if (req->rq_export == NULL) {
                 struct ldlm_request *dlm_req;
 
@@ -1714,9 +1717,8 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
                        libcfs_id2str(req->rq_peer),
                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
 
-                dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
-                                             sizeof(*dlm_req),
-                                             lustre_swab_ldlm_request);
+                req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+                dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
                 if (dlm_req != NULL)
                         ldlm_lock_dump_handle(D_ERROR,
                                               &dlm_req->lock_handle[0]);
@@ -1728,6 +1730,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
 
         /* XXX FIXME move this back to mds/handler.c, bug 249 */
         case LDLM_CANCEL:
+                req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
                 CDEBUG(D_INODE, "cancel\n");
                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
                         RETURN(0);
@@ -1736,6 +1739,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
                         break;
                 RETURN(0);
         case OBD_LOG_CANCEL:
+                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
                         RETURN(0);
                 rc = llog_origin_handle_cancel(req);
@@ -1744,6 +1748,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         default:
                 CERROR("invalid opcode %d\n",
                        lustre_msg_get_opc(req->rq_reqmsg));
+                req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
                 ldlm_callback_reply(req, -EINVAL);
         }
 
@@ -1920,7 +1925,7 @@ static int ldlm_bl_thread_main(void *arg)
                          * Thus lock is marked LDLM_FL_CANCELING, and already
                          * canceled locally. */
                         ldlm_cli_cancel_list(&blwi->blwi_head,
-                                             blwi->blwi_count, NULL, 0, 0);
+                                             blwi->blwi_count, NULL, 0);
                 } else {
                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
                                                 blwi->blwi_lock);
index 852cea8..7110846 100644 (file)
@@ -346,19 +346,19 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
                 if (rc == ELDLM_LOCK_ABORTED) {
                         /* Before we return, swab the reply */
-                        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF,
-                                                   sizeof(*reply),
-                                                   lustre_swab_ldlm_reply);
-                        if (reply == NULL) {
-                                CERROR("Can't unpack ldlm_reply\n");
+                        reply = req_capsule_server_get(&req->rq_pill,
+                                                       &RMF_DLM_REP);
+                        if (reply == NULL)
                                 rc = -EPROTO;
-                        }
                         if (lvb_len) {
-                                void *tmplvb;
-                                tmplvb = lustre_swab_repbuf(req,
-                                                            DLM_REPLY_REC_OFF,
-                                                            lvb_len,
-                                                            lvb_swabber);
+                                struct ost_lvb *tmplvb;
+
+                                req_capsule_set_size(&req->rq_pill,
+                                                     &RMF_DLM_LVB, RCL_SERVER,
+                                                     lvb_len);
+                            tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                                 &RMF_DLM_LVB,
+                                                                 lvb_swabber);
                                 if (tmplvb == NULL)
                                         GOTO(cleanup, rc = -EPROTO);
                                 if (lvb != NULL)
@@ -368,12 +368,9 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                 GOTO(cleanup, rc);
         }
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
                 GOTO(cleanup, rc = -EPROTO);
-        }
 
         /* lock enqueued on the server */
         cleanup_phase = 0;
@@ -447,8 +444,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
          * clobber the LVB with an older one. */
         if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
                 void *tmplvb;
-                tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len,
-                                            lvb_swabber);
+
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                     lvb_len);
+                tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                     &RMF_DLM_LVB,
+                                                     lvb_swabber);
                 if (tmplvb == NULL)
                         GOTO(cleanup, rc = -EPROTO);
                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
@@ -511,24 +512,25 @@ static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
  * @count locks in @cancels. */
-struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
-                                         int opc, int bufcount, int *size,
-                                         int bufoff, int canceloff,
-                                         struct list_head *cancels, int count)
+int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
+                      int version, int opc, int canceloff,
+                      struct list_head *cancels, int count)
 {
-        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        int flags, avail, to_free, pack = 0;
-        struct ldlm_request *dlm = NULL;
-        struct ptlrpc_request *req;
-        CFS_LIST_HEAD(head);
+        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
+        struct req_capsule      *pill = &req->rq_pill;
+        struct ldlm_request     *dlm = NULL;
+        int flags, avail, to_free, bufcount, pack = 0;
+        int rc;
         ENTRY;
 
-        if (cancels == NULL)
-                cancels = &head;
+
+        LASSERT(cancels != NULL);
+
         if (exp_connect_cancelset(exp)) {
                 /* Estimate the amount of available space in the request. */
-                avail = ldlm_req_handles_avail(exp, size, bufcount,
-                                               bufoff, canceloff);
+                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+                avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+                                               bufcount, bufcount - 1, canceloff);
                 flags = ns_connect_lru_resize(ns) ? 
                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
                 to_free = !ns_connect_lru_resize(ns) &&
@@ -544,14 +546,20 @@ struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
                         pack = count;
                 else
                         pack = avail;
-                size[bufoff] = ldlm_request_bufsize(pack, opc);
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, opc));
         }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
-                              opc, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req) {
+
+        rc = ptlrpc_request_pack(req, version, opc);
+        if (rc) {
+                ldlm_lock_list_put(cancels, l_bl_ast, count);
+                RETURN(rc);
+        }
+
+        if (exp_connect_cancelset(exp)) {
                 if (canceloff) {
-                        dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
-                                             sizeof(*dlm));
+                        dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
+                        LASSERT(dlm);
                         /* Skip first lock handler in ldlm_request_pack(),
                          * this method will incrment @lock_count according
                          * to the lock handle amount actually written to
@@ -559,22 +567,21 @@ struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
                         dlm->lock_count = canceloff;
                 }
                 /* Pack into the request @pack lock handles. */
-                ldlm_cli_cancel_list(cancels, pack, req, bufoff, 0);
+                ldlm_cli_cancel_list(cancels, pack, req, 0);
                 /* Prepare and send separate cancel rpc for others. */
-                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0, 0);
+                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
         } else {
                 ldlm_lock_list_put(cancels, l_bl_ast, count);
         }
-        RETURN(req);
+        RETURN(0);
 }
 
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
-                                             int bufcount, int *size,
-                                             struct list_head *cancels,
-                                             int count)
+int ldlm_prep_enqueue_req(struct obd_export *exp,
+                          struct ptlrpc_request *req,
+                          struct list_head *cancels,
+                          int count)
 {
-        return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
-                                 bufcount, size, DLM_LOCKREQ_OFF,
+        return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
 }
 
@@ -592,14 +599,11 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                      struct lustre_handle *lockh, int async)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        struct ldlm_lock *lock;
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
-        int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body),
-                        [DLM_REPLY_REC_OFF]   = lvb_len };
-        int is_replay = *flags & LDLM_FL_REPLAY;
-        int req_passed_in = 1, rc, err;
+        struct ldlm_lock      *lock;
+        struct ldlm_request   *body;
+        int                    is_replay = *flags & LDLM_FL_REPLAY;
+        int                    req_passed_in = 1;
+        int                    rc, err;
         struct ptlrpc_request *req;
         ENTRY;
 
@@ -646,7 +650,10 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         /* lock not sent to server yet */
 
         if (reqp == NULL || *reqp == NULL) {
-                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+                req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                                &RQF_LDLM_ENQUEUE,
+                                                LUSTRE_DLM_VERSION,
+                                                LDLM_ENQUEUE);
                 if (req == NULL) {
                         failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
                         LDLM_LOCK_PUT(lock);
@@ -656,12 +663,13 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                 if (reqp)
                         *reqp = req;
         } else {
+                int len;
+
                 req = *reqp;
-                LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
-                         sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n",
-                         DLM_LOCKREQ_OFF,
-                         lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),
-                         sizeof(*body));
+                len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
+                                           RCL_CLIENT);
+                LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
+                         DLM_LOCKREQ_OFF, len, sizeof(*body));
         }
 
         lock->l_conn_export = exp;
@@ -669,15 +677,20 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         lock->l_blocking_ast = einfo->ei_cb_bl;
 
         /* Dump lock data into the request buffer */
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = *flags;
         body->lock_handle[0] = *lockh;
 
         /* Continue as normal. */
         if (!req_passed_in) {
-                size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
-                ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);
+                if (lvb_len > 0) {
+                        req_capsule_extend(&req->rq_pill,
+                                           &RQF_LDLM_ENQUEUE_LVB);
+                        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+                                             RCL_SERVER, lvb_len);
+                }
+                ptlrpc_request_set_replen(req);
         }
 
         /*
@@ -745,14 +758,12 @@ static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
    accounting */
 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
 {
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
-        struct ldlm_lock *lock;
-        struct ldlm_resource *res;
+        struct ldlm_request   *body;
+        struct ldlm_reply     *reply;
+        struct ldlm_lock      *lock;
+        struct ldlm_resource  *res;
         struct ptlrpc_request *req;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int rc;
+        int                    rc;
         ENTRY;
 
         lock = ldlm_handle2lock(lockh);
@@ -767,30 +778,29 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
-                              LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
+                                        &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
+                                        LDLM_CONVERT);
+        if (req == NULL) {
+                LDLM_LOCK_PUT(lock);
+                RETURN(-ENOMEM);
+        }
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         body->lock_handle[0] = lock->l_remote_handle;
 
         body->lock_desc.l_req_mode = new_mode;
         body->lock_flags = *flags;
 
-        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
-        ptlrpc_req_set_repsize(req, 2, size);
 
+        ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR ("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
+                GOTO(out, rc = -EPROTO);
 
         if (req->rq_status)
                 GOTO(out, rc = req->rq_status);
@@ -862,7 +872,7 @@ static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
 
 /* Pack @count locks in @head into ldlm_request buffer at the offset @off,
    of the request @req. */
-static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
+static void ldlm_cancel_pack(struct ptlrpc_request *req,
                              struct list_head *head, int count)
 {
         struct ldlm_request *dlm;
@@ -870,11 +880,11 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
         int max, packed = 0;
         ENTRY;
 
-        dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
+        dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         LASSERT(dlm != NULL);
 
         /* Check the room in the request buffer. */
-        max = lustre_msg_buflen(req->rq_reqmsg, off) - 
+        max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) - 
                 sizeof(struct ldlm_request);
         max /= sizeof(struct lustre_handle);
         max += LDLM_LOCKREQ_HANDLES;
@@ -902,9 +912,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                         int count, int flags)
 {
         struct ptlrpc_request *req = NULL;
-        struct ldlm_request *body;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                [DLM_LOCKREQ_OFF]     = sizeof(*body) };
         struct obd_import *imp;
         int free, sent = 0;
         int rc = 0;
@@ -916,12 +923,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
                 RETURN(count);
 
-        free = ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
-        if (count > free)
-                count = free;
-
-        size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);
         while (1) {
+                int bufcount;
+                struct req_capsule *pill; 
                 imp = class_exp2cliimp(exp);
                 if (imp == NULL || imp->imp_invalid) {
                         CDEBUG(D_DLMTRACE,
@@ -929,11 +933,26 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                         RETURN(count);
                 }
 
-                req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
-                                      size, NULL);
-                if (!req)
+                req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
+                if (req == NULL)
                         GOTO(out, rc = -ENOMEM);
 
+                pill = &req->rq_pill;
+                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+
+                free = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+                                              bufcount, bufcount, 0);
+                if (count > free)
+                        count = free;
+
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, LDLM_CANCEL));
+
+                rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
+                if (rc) {
+                        ptlrpc_request_free(req);
+                        GOTO(out, rc);
+                }
                 req->rq_no_resend = 1;
                 req->rq_no_delay = 1;
 
@@ -941,11 +960,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
 
-                body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
-                                      sizeof(*body));
-                ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count);
+                ldlm_cancel_pack(req, cancels, count);
 
-                ptlrpc_req_set_repsize(req, 1, NULL);
+                ptlrpc_request_set_replen(req);
                 if (flags & LDLM_FL_ASYNC) {
                         ptlrpcd_add_req(req);
                         sent = count;
@@ -1076,7 +1093,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
                                                LDLM_FL_BL_AST, flags);
         }
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(0);
 }
 
@@ -1116,7 +1133,7 @@ static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
         }
         if (bl_ast > 0) {
                 count -= bl_ast;
-                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0, 0);
+                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
         }
 
         RETURN(count);
@@ -1384,7 +1401,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
 
         /* If an error occured in ASYNC mode, or
          * this is SYNC mode, cancel the list. */
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(count);
 }
 
@@ -1455,7 +1472,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
  * buffer at the offset @off.
  * Destroy @cancels at the end. */
 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
-                         struct ptlrpc_request *req, int off, int flags)
+                         struct ptlrpc_request *req, int flags)
 {
         struct ldlm_lock *lock;
         int res = 0;
@@ -1477,7 +1494,7 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                 if (exp_connect_cancelset(lock->l_conn_export)) {
                         res = count;
                         if (req)
-                                ldlm_cancel_pack(req, off, cancels, count);
+                                ldlm_cancel_pack(req, cancels, count);
                         else
                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
                                                           cancels, count,
@@ -1519,7 +1536,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
         count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
                                            0, flags, opaque);
-        rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
+        rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
         if (rc != ELDLM_OK)
                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
 
@@ -1773,7 +1790,7 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
 static int replay_lock_interpret(struct ptlrpc_request *req,
                                  struct ldlm_async_args *aa, int rc)
 {
-        struct ldlm_lock *lock;
+        struct ldlm_lock  *lock;
         struct ldlm_reply *reply;
 
         ENTRY;
@@ -1782,12 +1799,9 @@ static int replay_lock_interpret(struct ptlrpc_request *req,
                 GOTO(out, rc);
 
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
+                GOTO(out, rc = -EPROTO);
 
         lock = ldlm_handle2lock(&aa->lock_handle);
         if (!lock) {
@@ -1814,11 +1828,8 @@ out:
 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 {
         struct ptlrpc_request *req;
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
         struct ldlm_async_args *aa;
-        int buffers = 2;
-        int size[3] = { sizeof(struct ptlrpc_body) };
+        struct ldlm_request   *body;
         int flags;
         ENTRY;
 
@@ -1860,26 +1871,25 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         else
                 flags = LDLM_FL_REPLAY;
 
-        size[DLM_LOCKREQ_OFF] = sizeof(*body);
-        req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 2, size,
-                              NULL);
-        if (!req)
+        req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
+                                        LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        if (req == NULL)
                 RETURN(-ENOMEM);
 
         /* We're part of recovery, so don't wait for it. */
         req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = flags;
 
         ldlm_lock2handle(lock, &body->lock_handle[0]);
-        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
         if (lock->l_lvb_len != 0) {
-                buffers = 3;
-                size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
+                req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                     lock->l_lvb_len);
         }
-        ptlrpc_req_set_repsize(req, buffers, size);
+        ptlrpc_request_set_replen(req);
         /* notify the server we've replayed all requests.
          * also, we mark the request to be put on a dedicated
          * queue to be processed after all request replayes.
index fdeabf3..a297c9b 100644 (file)
@@ -105,11 +105,8 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         rc = md_readpage(sbi->ll_md_exp, &lli->lli_fid, NULL,
                          offset, page, &request);
         if (!rc) {
-                body = lustre_msg_buf(request->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*body));
+                body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body != NULL);         /* checked by md_readpage() */
-                /* swabbed by md_readpage() */
-                LASSERT(lustre_rep_swabbed(request, REPLY_REC_OFF));
 
                 if (body->valid & OBD_MD_FLSIZE)
                         st->st_size = body->size;
index cf5267b..25b425b 100644 (file)
@@ -146,10 +146,8 @@ int llu_local_open(struct llu_inode_info *lli, struct lookup_intent *it)
         struct mdt_body *body;
         ENTRY;
 
-        body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
-        LASSERT(body != NULL);                 /* reply already checked out */
-        /* and swabbed down */
-        LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+        LASSERT(body != NULL);
 
         /* already opened? */
         if (lli->lli_open_count++)
@@ -246,7 +244,7 @@ int llu_iop_open(struct pnode *pnode, int flags, mode_t mode)
         RETURN(rc);
 }
 
-int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
+int llu_objects_destroy(struct ptlrpc_request *req, struct inode *dir)
 {
         struct mdt_body *body;
         struct lov_mds_md *eadata;
@@ -256,8 +254,7 @@ int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
         int rc;
         ENTRY;
 
-        /* req is swabbed so this is safe */
-        body = lustre_msg_buf(request->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 
         if (!(body->valid & OBD_MD_FLEASIZE))
                 RETURN(0);
@@ -271,13 +268,10 @@ int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
          * to this file. Use this EA to unlink the objects on the OST.
          * It's opaque so we don't swab here; we leave it to obd_unpackmd() to
          * check it is complete and sensible. */
-        eadata = lustre_swab_repbuf(request, REPLY_REC_OFF+1, body->eadatasize,
-                                    NULL);
+        eadata = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
+                                              body->eadatasize);
+
         LASSERT(eadata != NULL);
-        if (eadata == NULL) {
-                CERROR("Can't unpack MDS EA data\n");
-                GOTO(out, rc = -EPROTO);
-        }
 
         rc = obd_unpackmd(llu_i2obdexp(dir), &lsm, eadata,body->eadatasize);
         if (rc < 0) {
@@ -298,9 +292,10 @@ int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
         if (body->valid & OBD_MD_FLCOOKIE) {
                 oa->o_valid |= OBD_MD_FLCOOKIE;
                 oti.oti_logcookies =
-                        lustre_msg_buf(request->rq_repmsg, REPLY_REC_OFF + 2,
-                                       sizeof(struct llog_cookie) *
-                                       lsm->lsm_stripe_count);
+                        req_capsule_server_sized_get(&req->rq_pill,
+                                                   &RMF_LOGCOOKIES,
+                                                   sizeof(struct llog_cookie) *
+                                                   lsm->lsm_stripe_count);
                 if (oti.oti_logcookies == NULL) {
                         oa->o_valid &= ~OBD_MD_FLCOOKIE;
                         body->valid &= ~OBD_MD_FLCOOKIE;
index 486f874..9e5de90 100644 (file)
@@ -157,7 +157,7 @@ int liblustre_process_log(struct config_llog_instance *cfg,
         if (ocd == NULL)
                 GOTO(out_cleanup, rc = -ENOMEM);
 
-        ocd->ocd_connect_flags = OBD_CONNECT_VERSION;
+        ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_FID;
 #ifdef LIBLUSTRE_POSIX_ACL
         ocd->ocd_connect_flags |= OBD_CONNECT_ACL;
 #endif
index ad90650..a09bb97 100644 (file)
@@ -173,7 +173,6 @@ int llu_md_blocking_ast(struct ldlm_lock *lock,
 }
 
 static int pnode_revalidate_finish(struct ptlrpc_request *req,
-                                   int offset,
                                    struct lookup_intent *it,
                                    struct pnode *pnode)
 {
@@ -191,7 +190,7 @@ static int pnode_revalidate_finish(struct ptlrpc_request *req,
                 RETURN(-ENOENT);
 
         rc = md_get_lustre_md(llu_i2sbi(inode)->ll_md_exp, req,
-                              offset, llu_i2sbi(inode)->ll_dt_exp, 
+                              llu_i2sbi(inode)->ll_dt_exp, 
                               llu_i2sbi(inode)->ll_md_exp, &md);
         if (rc)
                 RETURN(rc);
@@ -265,7 +264,7 @@ static int llu_pb_revalidate(struct pnode *pnode, int flags,
         if (rc < 0)
                 GOTO(out, rc = 0);
 
-        rc = pnode_revalidate_finish(req, DLM_REPLY_REC_OFF, it, pnode);
+        rc = pnode_revalidate_finish(req, it, pnode);
         if (rc != 0) {
                 ll_intent_release(it);
                 GOTO(out, rc = 0);
@@ -335,7 +334,7 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset,
                 if (it_disposition(it, DISP_OPEN_CREATE))
                         ptlrpc_req_finished(request);
 
-                rc = md_get_lustre_md(sbi->ll_md_exp, request, offset,
+                rc = md_get_lustre_md(sbi->ll_md_exp, request,
                                       sbi->ll_dt_exp, sbi->ll_md_exp, &md);
                 if (rc)
                         RETURN(rc);
index 7d91ef8..021a592 100644 (file)
@@ -179,7 +179,6 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         struct inode *inode = llu_inode_from_lock(lock);
         struct llu_inode_info *lli;
         struct ost_lvb *lvb;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
         int rc, stripe = 0;
         ENTRY;
 
@@ -195,11 +194,16 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         if (lli->lli_smd->lsm_stripe_count > 1)
                 stripe = llu_lock_to_stripe_offset(inode, lock);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc)
+        req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
+        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                             sizeof(*lvb));
+        rc = req_capsule_server_pack(&req->rq_pill);
+        if (rc) {
+                CERROR("failed pack reply: %d\n", rc);
                 GOTO(iput, rc);
+        }
 
-        lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
+        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
 
         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
index e24258c..d4b1fd5 100644 (file)
@@ -465,7 +465,7 @@ static int llu_inode_revalidate(struct inode *inode)
                                (long long)llu_i2stat(inode)->st_ino);
                         RETURN(-abs(rc));
                 }
-                rc = md_get_lustre_md(sbi->ll_md_exp, req, REPLY_REC_OFF,
+                rc = md_get_lustre_md(sbi->ll_md_exp, req,
                                       sbi->ll_dt_exp, sbi->ll_md_exp, &md);
 
                 /* XXX Too paranoid? */
@@ -642,7 +642,7 @@ int llu_md_setattr(struct inode *inode, struct md_op_data *op_data,
                 RETURN(rc);
         }
 
-        rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF,
+        rc = md_get_lustre_md(sbi->ll_md_exp, request,
                               sbi->ll_dt_exp, sbi->ll_md_exp, &md);
         if (rc) {
                 ptlrpc_req_finished(request);
@@ -993,10 +993,8 @@ static int llu_readlink_internal(struct inode *inode,
                 RETURN(rc);
         }
 
-        body = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF,
-                              sizeof(*body));
+        body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL);
-        LASSERT(lustre_rep_swabbed(*request, REPLY_REC_OFF));
 
         if ((body->valid & OBD_MD_LINKNAME) == 0) {
                 CERROR ("OBD_MD_LINKNAME not set on reply\n");
@@ -1010,8 +1008,7 @@ static int llu_readlink_internal(struct inode *inode,
                 GOTO(failed, rc = -EPROTO);
         }
 
-        *symname = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF + 1,
-                                   symlen);
+        *symname = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_MD);
         if (*symname == NULL ||
             strnlen(*symname, symlen) != symlen - 1) {
                 /* not full/NULL terminated */
@@ -1791,7 +1788,7 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
         }
 
         rc = md_get_lustre_md(sbi->ll_md_exp, req,
-                              DLM_REPLY_REC_OFF, sbi->ll_dt_exp, sbi->ll_md_exp, &md);
+                              sbi->ll_dt_exp, sbi->ll_md_exp, &md);
         if (rc)
                 GOTO(out, rc);
 
@@ -2074,7 +2071,8 @@ llu_fsswop_mount(const char *source,
         obd_set_info_async(obd->obd_self_export, strlen("async"), "async",
                            sizeof(async), &async, NULL);
 
-        ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION;
+        ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION |
+                                OBD_CONNECT_FID;
 #ifdef LIBLUSTRE_POSIX_ACL
         ocd.ocd_connect_flags |= OBD_CONNECT_ACL;
 #endif
@@ -2138,7 +2136,7 @@ llu_fsswop_mount(const char *source,
                 GOTO(out_dt, err);
         }
 
-        err = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF,
+        err = md_get_lustre_md(sbi->ll_md_exp, request,
                                sbi->ll_dt_exp, sbi->ll_md_exp, &md);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n",err);
index 1a16669..42bce6f 100644 (file)
@@ -269,7 +269,7 @@ restart:
 }
 
 int ll_revalidate_it_finish(struct ptlrpc_request *request,
-                            int offset, struct lookup_intent *it,
+                            struct lookup_intent *it,
                             struct dentry *de)
 {
         int rc = 0;
@@ -281,8 +281,7 @@ int ll_revalidate_it_finish(struct ptlrpc_request *request,
         if (it_disposition(it, DISP_LOOKUP_NEG)) 
                 RETURN(-ENOENT);
 
-        rc = ll_prep_inode(&de->d_inode,
-                           request, offset, NULL);
+        rc = ll_prep_inode(&de->d_inode, request, NULL);
 
         RETURN(rc);
 }
@@ -468,7 +467,7 @@ do_lock:
         }
 
 revalidate_finish:
-        rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, de);
+        rc = ll_revalidate_it_finish(req, it, de);
         if (rc != 0) {
                 if (rc != -ESTALE && rc != -ENOENT)
                         ll_intent_release(it);
@@ -556,11 +555,10 @@ do_lookup:
         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
                             ll_md_blocking_ast, 0);
         if (rc >= 0) {
-                struct mdt_body *mdt_body = lustre_msg_buf(req->rq_repmsg,
-                                                           DLM_REPLY_REC_OFF,
-                                                           sizeof(*mdt_body));
+                struct mdt_body *mdt_body;
                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
-
+                mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+                
                 if (de->d_inode)
                         fid = *ll_inode2fid(de->d_inode);
 
index 9f5f22a..c230e02 100644 (file)
@@ -153,14 +153,10 @@ static int ll_dir_readpage(struct file *file, struct page *page)
                          oc, hash, page, &request);
         capa_put(oc);
         if (!rc) {
-                body = lustre_msg_buf(request->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*body));
+                body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
                 /* Checked by mdc_readpage() */
                 LASSERT(body != NULL);
 
-                /* Swabbed by mdc_readpage() */
-                LASSERT(lustre_rep_swabbed(request, REPLY_REC_OFF));
-
                 if (body->valid & OBD_MD_FLSIZE) {
                         ll_inode_size_lock(inode, 0);
                         i_size_write(inode, body->size);
@@ -640,10 +636,8 @@ int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmmp,
                 GOTO(out, rc);
         }
 
-        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
-        LASSERT(body != NULL); /* checked by md_getattr_name */
-        /* swabbed by mdc_getattr_name */
-        LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+        LASSERT(body != NULL);
 
         lmmsize = body->eadatasize;
 
@@ -652,9 +646,9 @@ int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmmp,
                 GOTO(out, rc = -ENODATA);
         }
 
-        lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
+        lmm = req_capsule_server_sized_get(&req->rq_pill,
+                                           &RMF_MDT_MD, lmmsize);
         LASSERT(lmm != NULL);
-        LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
 
         /*
          * This is coming from the MDS, so is probably in
@@ -782,11 +776,9 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
                 }
 
                 if (request) {
-                        body = lustre_msg_buf(request->rq_repmsg,
-                                              REPLY_REC_OFF, sizeof(*body));
-                        LASSERT(body != NULL); /* checked by md_getattr_name */
-                        /* swabbed by md_getattr_name */
-                        LASSERT(lustre_rep_swabbed(request, REPLY_REC_OFF));
+                        body = req_capsule_server_get(&request->rq_pill,
+                                                      &RMF_MDT_BODY);
+                        LASSERT(body != NULL);
                 } else {
                         GOTO(out_req, rc);
                 }
@@ -894,11 +886,10 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
         }
         case OBD_IOC_LLOG_CATINFO: {
                 struct ptlrpc_request *req = NULL;
-                char *buf = NULL;
-                int rc, len = 0;
-                char *bufs[3] = { NULL }, *str;
-                int lens[3] = { sizeof(struct ptlrpc_body) };
-                int size[2] = { sizeof(struct ptlrpc_body) };
+                char                  *buf = NULL;
+                char                  *str;
+                int                    len = 0;
+                int                    rc;
 
                 rc = obd_ioctl_getdata(&buf, &len, (void *)arg);
                 if (rc)
@@ -910,29 +901,38 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
                         RETURN(-EINVAL);
                 }
 
-                lens[REQ_REC_OFF] = data->ioc_inllen1;
-                bufs[REQ_REC_OFF] = data->ioc_inlbuf1;
-                if (data->ioc_inllen2) {
-                        lens[REQ_REC_OFF + 1] = data->ioc_inllen2;
-                        bufs[REQ_REC_OFF + 1] = data->ioc_inlbuf2;
-                } else {
-                        lens[REQ_REC_OFF + 1] = 0;
-                        bufs[REQ_REC_OFF + 1] = NULL;
+                req = ptlrpc_request_alloc(sbi2mdc(sbi)->cl_import,
+                                           &RQF_LLOG_CATINFO);
+                if (req == NULL)
+                        GOTO(out_catinfo, rc = -ENOMEM);
+
+                req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                                     data->ioc_inllen1);
+                req_capsule_set_size(&req->rq_pill, &RMF_STRING, RCL_CLIENT,
+                                     data->ioc_inllen2);
+
+                rc = ptlrpc_request_pack(req, LUSTRE_LOG_VERSION, LLOG_CATINFO);
+                if (rc) {
+                        ptlrpc_request_free(req);
+                        GOTO(out_catinfo, rc);
                 }
 
-                req = ptlrpc_prep_req(sbi2mdc(sbi)->cl_import,
-                                      LUSTRE_LOG_VERSION, LLOG_CATINFO, 3, lens,
-                                      bufs);
-                if (!req)
-                        GOTO(out_catinfo, rc = -ENOMEM);
+                str = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
+                memcpy(str, data->ioc_inlbuf1, data->ioc_inllen1);
+                if (data->ioc_inllen2) {
+                        str = req_capsule_client_get(&req->rq_pill,
+                                                     &RMF_STRING);
+                        memcpy(str, data->ioc_inlbuf2, data->ioc_inllen2);
+                }
 
-                size[REPLY_REC_OFF] = data->ioc_plen1;
-                ptlrpc_req_set_repsize(req, 2, size);
+                req_capsule_set_size(&req->rq_pill, &RMF_STRING, RCL_SERVER,
+                                     data->ioc_plen1);
+                ptlrpc_request_set_replen(req);
 
                 rc = ptlrpc_queue_wait(req);
                 if (!rc) {
-                        str = lustre_msg_string(req->rq_repmsg, REPLY_REC_OFF,
-                                                data->ioc_plen1);
+                        str = req_capsule_server_get(&req->rq_pill,
+                                                     &RMF_STRING);
                         rc = copy_to_user(data->ioc_pbuf1, str, data->ioc_plen1);
                 }
                 ptlrpc_req_finished(req);
index f21e44f..598c994 100644 (file)
@@ -376,8 +376,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                                  &itp->d.lustre.it_lock_handle, 
                                  file->f_dentry->d_inode);
 
-        rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
-                           NULL);
+        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
 out:
         ptlrpc_req_finished(itp->d.lustre.it_data);
 
@@ -396,11 +395,8 @@ static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
 
         LASSERT(och);
 
-        body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
-        /* reply already checked out */
-        LASSERT(body != NULL);
-        /* and swabbed in md_enqueue */
-        LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+        LASSERT(body != NULL);                      /* reply already checked out */
 
         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
@@ -431,15 +427,11 @@ int ll_local_open(struct file *file, struct lookup_intent *it,
                 if (rc)
                         RETURN(rc);
 
-                body = lustre_msg_buf(req->rq_repmsg,
-                                      DLM_REPLY_REC_OFF, sizeof(*body));
-
+                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
                 if ((it->it_flags & FMODE_WRITE) &&
                     (body->valid & OBD_MD_FLSIZE))
-                {
                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
                                lli->lli_ioepoch, PFID(&lli->lli_fid));
-                }
         }
 
         LUSTRE_FPRIVATE(file) = fd;
@@ -1034,7 +1026,6 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         struct lov_stripe_md *lsm;
         struct ost_lvb *lvb;
         int rc, stripe;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
         ENTRY;
 
         if (inode == NULL)
@@ -1051,11 +1042,16 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         if (stripe < 0)
                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc)
+        req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
+        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                             sizeof(*lvb));
+        rc = req_capsule_server_pack(&req->rq_pill);
+        if (rc) {
+                CERROR("lustre_pack_reply: %d\n", rc);
                 GOTO(iput, rc);
+        }
 
-        lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
+        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
         lvb->lvb_atime = LTIME_S(inode->i_atime);
@@ -1814,10 +1810,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 GOTO(out, rc);
         }
 
-        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL); /* checked by mdc_getattr_name */
-        /* swabbed by mdc_getattr_name */
-        LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
 
         lmmsize = body->eadatasize;
 
@@ -1826,9 +1820,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 GOTO(out, rc = -ENODATA);
         }
 
-        lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
+        lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
         LASSERT(lmm != NULL);
-        LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
 
         /*
          * This is coming from the MDS, so is probably in
@@ -2645,7 +2638,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                         GOTO (out, rc);
                 }
 
-                rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
+                rc = ll_revalidate_it_finish(req, &oit, dentry);
                 if (rc != 0) {
                         ll_intent_release(&oit);
                         GOTO(out, rc);
@@ -2686,8 +2679,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                         RETURN(rc);
                 }
 
-                rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
-                                   NULL);
+                rc = ll_prep_inode(&inode, req, NULL);
                 if (rc)
                         GOTO(out, rc);
         }
index 64022e5..e81d275 100644 (file)
@@ -497,7 +497,8 @@ extern struct file_operations ll_dir_operations;
 extern struct inode_operations ll_dir_inode_operations;
 
 /* llite/namei.c */
-int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir);
+int ll_objects_destroy(struct ptlrpc_request *request,
+                       struct inode *dir);
 struct inode *ll_iget(struct super_block *sb, ino_t hash,
                       struct lustre_md *lic);
 struct dentry *ll_find_alias(struct inode *, struct dentry *);
@@ -590,7 +591,7 @@ void ll_unhash_aliases(struct inode *);
 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft);
 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name);
-int ll_revalidate_it_finish(struct ptlrpc_request *request, int offset,
+int ll_revalidate_it_finish(struct ptlrpc_request *request,
                             struct lookup_intent *it, struct dentry *de);
 
 /* llite/llite_lib.c */
@@ -625,7 +626,7 @@ void ll_umount_begin(struct super_block *sb);
 #endif
 int ll_remount_fs(struct super_block *sb, int *flags, char *data);
 int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
-                  int offset, struct super_block *);
+                  struct super_block *);
 void lustre_dump_dentry(struct dentry *, int recur);
 void lustre_dump_inode(struct inode *);
 struct ll_async_page *llite_pglist_next_llap(struct ll_sb_info *sbi,
index 0f91ebc..eb45e5b 100644 (file)
@@ -196,11 +196,12 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
         }
 
         /* indicate the features supported by this client */
-        data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH |
-                                  OBD_CONNECT_JOIN |
-                                  OBD_CONNECT_ATTRFID | OBD_CONNECT_VERSION |
-                                  OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
-                                  OBD_CONNECT_CANCELSET;
+        data->ocd_connect_flags = OBD_CONNECT_IBITS    | OBD_CONNECT_NODEVOH  |
+                                  OBD_CONNECT_JOIN     | OBD_CONNECT_ATTRFID  |
+                                  OBD_CONNECT_VERSION  | OBD_CONNECT_MDS_CAPA |
+                                  OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET|
+                                  OBD_CONNECT_FID;
+
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
                 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
@@ -361,9 +362,9 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                 GOTO(out_md_fid, err = -ENODEV);
         }
 
-        data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION |
+        data->ocd_connect_flags = OBD_CONNECT_GRANT     | OBD_CONNECT_VERSION  |
                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
-                                  OBD_CONNECT_CANCELSET;
+                                  OBD_CONNECT_CANCELSET | OBD_CONNECT_FID;
         if (sbi->ll_flags & LL_SBI_OSS_CAPA)
                 data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA;
 
@@ -451,9 +452,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                 GOTO(out_dt_fid, err);
         }
         memset(&lmd, 0, sizeof(lmd));
-        err = md_get_lustre_md(sbi->ll_md_exp, request, 
-                               REPLY_REC_OFF, sbi->ll_dt_exp, sbi->ll_md_exp, 
-                               &lmd);
+        err = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp,
+                               sbi->ll_md_exp, &lmd);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n", err);
                 ptlrpc_req_finished (request);
@@ -1175,8 +1175,8 @@ int ll_md_setattr(struct inode *inode, struct md_op_data *op_data,
                 RETURN(rc);
         }
 
-        rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF,
-                              sbi->ll_dt_exp, sbi->ll_md_exp, &md);
+        rc = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp,
+                              sbi->ll_md_exp, &md);
         if (rc) {
                 ptlrpc_req_finished(request);
                 RETURN(rc);
@@ -1830,11 +1830,11 @@ int ll_iocontrol(struct inode *inode, struct file *file,
                         RETURN(-abs(rc));
                 }
 
-                body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*body));
+                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+
                 flags = body->flags;
 
-                ptlrpc_req_finished (req);
+                ptlrpc_req_finished(req);
 
                 RETURN(put_user(flags, (int *)arg));
         }
@@ -1997,8 +1997,9 @@ int ll_remount_fs(struct super_block *sb, int *flags, char *data)
         return 0;
 }
 
-int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
-                  int offset, struct super_block *sb)
+int ll_prep_inode(struct inode **inode,
+                  struct ptlrpc_request *req,
+                  struct super_block *sb)
 {
         struct ll_sb_info *sbi = NULL;
         struct lustre_md md;
@@ -2010,8 +2011,8 @@ int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
         prune_deathrow(sbi, 1);
         memset(&md, 0, sizeof(struct lustre_md));
 
-        rc = md_get_lustre_md(sbi->ll_md_exp, req, offset,
-                              sbi->ll_dt_exp, sbi->ll_md_exp, &md);
+        rc = md_get_lustre_md(sbi->ll_md_exp, req, sbi->ll_dt_exp,
+                              sbi->ll_md_exp, &md);
         if (rc)
                 RETURN(rc);
 
index afd0c9b..8892862 100644 (file)
@@ -70,7 +70,7 @@ static struct inode *search_inode_for_lustre(struct super_block *sb,
                 RETURN(ERR_PTR(rc));
         }
 
-        rc = ll_prep_inode(&inode, req, REPLY_REC_OFF, sb);
+        rc = ll_prep_inode(&inode, req, sb);
         ptlrpc_req_finished(req);
         if (rc)
                 RETURN(ERR_PTR(rc));
@@ -219,8 +219,7 @@ static struct dentry *ll_get_parent(struct dentry *dchild)
                 CERROR("failure %d inode %lu get parent\n", rc, dir->i_ino);
                 RETURN(ERR_PTR(rc));
         }
-        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body)); 
-       
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body->valid & OBD_MD_FLID);
         
         CDEBUG(D_INFO, "parent for "DFID" is "DFID"\n", 
index c4cc5ed..17388df 100644 (file)
@@ -360,8 +360,9 @@ struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
         return de;
 }
 
-static int lookup_it_finish(struct ptlrpc_request *request, int offset,
-                            struct lookup_intent *it, void *data)
+static int lookup_it_finish(struct ptlrpc_request *request,
+                            struct lookup_intent *it,
+                            void *data)
 {
         struct it_cb_data *icbd = data;
         struct dentry **de = icbd->icbd_childp;
@@ -375,8 +376,7 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset,
         if (!it_disposition(it, DISP_LOOKUP_NEG)) {
                 ENTRY;
 
-                rc = ll_prep_inode(&inode, request, offset,
-                                   (*de)->d_sb);
+                rc = ll_prep_inode(&inode, request, (*de)->d_sb);
                 if (rc)
                         RETURN(rc);
 
@@ -478,7 +478,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
         if (rc < 0)
                 GOTO(out, retval = ERR_PTR(rc));
 
-        rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
+        rc = lookup_it_finish(req, it, &icbd);
         if (rc != 0) {
                 ll_intent_release(it);
                 GOTO(out, retval = ERR_PTR(rc));
@@ -647,7 +647,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
         LASSERT(it_disposition(it, DISP_ENQ_CREATE_REF));
         request = it->d.lustre.it_data;
         it_clear_disposition(it, DISP_ENQ_CREATE_REF);
-        rc = ll_prep_inode(&inode, request, DLM_REPLY_REC_OFF, dir->i_sb);
+        rc = ll_prep_inode(&inode, request, dir->i_sb);
         if (rc)
                 GOTO(out, inode = ERR_PTR(rc));
 
@@ -712,13 +712,13 @@ static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode,
         RETURN(0);
 }
 
-static void ll_update_times(struct ptlrpc_request *request, int offset,
+static void ll_update_times(struct ptlrpc_request *request,
                             struct inode *inode)
 {
-        struct mdt_body *body = lustre_msg_buf(request->rq_repmsg, offset,
-                                               sizeof(*body));
-        LASSERT(body);
+        struct mdt_body *body = req_capsule_server_get(&request->rq_pill,
+                                                       &RMF_MDT_BODY);
 
+        LASSERT(body);
         /* mtime is always updated with ctime, but can be set in past.
            As write and utime(2) may happen within 1 second, and utime's
            mtime has a priority over write's one, so take mtime from mds
@@ -763,11 +763,10 @@ static int ll_new_node(struct inode *dir, struct qstr *name,
         if (err)
                 GOTO(err_exit, err);
 
-        ll_update_times(request, REPLY_REC_OFF, dir);
+        ll_update_times(request, dir);
 
         if (dchild) {
-                err = ll_prep_inode(&inode, request, REPLY_REC_OFF,
-                                    dchild->d_sb);
+                err = ll_prep_inode(&inode, request, dchild->d_sb);
                 if (err)
                      GOTO(err_exit, err);
 
@@ -900,7 +899,7 @@ static int ll_link_generic(struct inode *src,  struct inode *dir,
         if (dchild)
                 d_drop(dchild);
 
-        ll_update_times(request, REPLY_REC_OFF, dir);
+        ll_update_times(request, dir);
         EXIT;
 out:
         ptlrpc_req_finished(request);
@@ -962,7 +961,7 @@ static int ll_rmdir_generic(struct inode *dir, struct dentry *dparent,
         rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
         ll_finish_md_op_data(op_data);
         if (rc == 0)
-                ll_update_times(request, REPLY_REC_OFF, dir);
+                ll_update_times(request, dir);
         ptlrpc_req_finished(request);
         RETURN(rc);
 }
@@ -978,8 +977,7 @@ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
         ENTRY;
 
         /* req is swabbed so this is safe */
-        body = lustre_msg_buf(request->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
-
+        body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
         if (!(body->valid & OBD_MD_FLEASIZE))
                 RETURN(0);
 
@@ -992,13 +990,9 @@ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
          * to this file. Use this EA to unlink the objects on the OST.
          * It's opaque so we don't swab here; we leave it to obd_unpackmd() to
          * check it is complete and sensible. */
-        eadata = lustre_swab_repbuf(request, REPLY_REC_OFF + 1,
-                                    body->eadatasize, NULL);
+        eadata = req_capsule_server_sized_get(&request->rq_pill, &RMF_MDT_MD,
+                                              body->eadatasize);
         LASSERT(eadata != NULL);
-        if (eadata == NULL) {
-                CERROR("Can't unpack MDS EA data\n");
-                GOTO(out, rc = -EPROTO);
-        }
 
         rc = obd_unpackmd(ll_i2dtexp(dir), &lsm, eadata, body->eadatasize);
         if (rc < 0) {
@@ -1022,10 +1016,11 @@ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
 
         if (body->valid & OBD_MD_FLCOOKIE) {
                 oa->o_valid |= OBD_MD_FLCOOKIE;
-                oti.oti_logcookies =
-                        lustre_msg_buf(request->rq_repmsg, REPLY_REC_OFF + 2,
-                                       sizeof(struct llog_cookie) *
-                                       lsm->lsm_stripe_count);
+                oti.oti_logcookies = 
+                        req_capsule_server_sized_get(&request->rq_pill,
+                                                     &RMF_LOGCOOKIES,
+                                                   sizeof(struct llog_cookie) *
+                                                     lsm->lsm_stripe_count);
                 if (oti.oti_logcookies == NULL) {
                         oa->o_valid &= ~OBD_MD_FLCOOKIE;
                         body->valid &= ~OBD_MD_FLCOOKIE;
@@ -1071,7 +1066,7 @@ static int ll_unlink_generic(struct inode *dir, struct dentry *dparent,
         if (rc)
                 GOTO(out, rc);
 
-        ll_update_times(request, REPLY_REC_OFF, dir);
+        ll_update_times(request, dir);
 
         rc = ll_objects_destroy(request, dir);
  out:
@@ -1110,8 +1105,8 @@ static int ll_rename_generic(struct inode *src, struct dentry *src_dparent,
                         tgt_name->name, tgt_name->len, &request);
         ll_finish_md_op_data(op_data);
         if (!err) {
-                ll_update_times(request, REPLY_REC_OFF, src);
-                ll_update_times(request, REPLY_REC_OFF, tgt);
+                ll_update_times(request, src);
+                ll_update_times(request, tgt);
                 err = ll_objects_destroy(request, src);
         }
 
index e87239d..20cfa35 100644 (file)
@@ -100,7 +100,7 @@ static inline int remote_perm_hashfunc(uid_t uid)
 }
 
 /* NB: setxid permission is not checked here, instead it's done on
- * MDT when client get remote permission. (lookup/mdc_get_remote_perm). */
+ * MDT when client get remote permission. */
 static int do_check_remote_perm(struct ll_inode_info *lli, int mask)
 {
         struct hlist_head *head;
@@ -271,9 +271,8 @@ check:
                 RETURN(rc);
         }
 
-        perm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, sizeof(*perm));
+        perm = req_capsule_server_get(&req->rq_pill, &RMF_ACL);
         LASSERT(perm);
-        LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
 
         rc = ll_update_remote_perm(inode, perm);
         up(&lli->lli_rmtperm_sem);
index 7c9c1d3..033e712 100644 (file)
@@ -57,11 +57,8 @@ static int ll_readlink_internal(struct inode *inode,
                 GOTO (failed, rc);
         }
 
-        body = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF,
-                              sizeof(*body));
+        body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL);
-        LASSERT(lustre_rep_swabbed(*request, REPLY_REC_OFF));
-
         if ((body->valid & OBD_MD_LINKNAME) == 0) {
                 CERROR("OBD_MD_LINKNAME not set on reply\n");
                 GOTO(failed, rc = -EPROTO);
@@ -74,10 +71,9 @@ static int ll_readlink_internal(struct inode *inode,
                 GOTO(failed, rc = -EPROTO);
         }
 
-        *symname = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF + 1,
-                                  symlen);
+        *symname = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_MD);
         if (*symname == NULL ||
-            strnlen (*symname, symlen) != symlen - 1) {
+            strnlen(*symname, symlen) != symlen - 1) {
                 /* not full/NULL terminated */
                 CERROR("inode %lu: symlink not NULL terminated string"
                         "of length %d\n", inode->i_ino, symlen - 1);
index db1bd85..933834f 100644 (file)
@@ -326,9 +326,8 @@ do_getxattr:
                 RETURN(rc);
         }
 
-        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body);
-        LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
 
         /* only detect the xattr size */
         if (size == 0)
@@ -340,21 +339,11 @@ do_getxattr:
                 GOTO(out, rc = -ERANGE);
         }
 
-        if (lustre_msg_bufcount(req->rq_repmsg) < 3) {
-                CERROR("reply bufcount %u\n",
-                       lustre_msg_bufcount(req->rq_repmsg));
-                GOTO(out, rc = -EFAULT);
-        }
-
         /* do not need swab xattr data */
-        lustre_set_rep_swabbed(req, REPLY_REC_OFF + 1);
-        xdata = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
-                               body->eadatasize);
-        if (!xdata) {
-                CERROR("can't extract: %u : %u\n", body->eadatasize,
-                       lustre_msg_buflen(req->rq_repmsg, REPLY_REC_OFF + 1));
+        xdata = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA,
+                                             body->eadatasize);
+        if (!xdata)
                 GOTO(out, rc = -EFAULT);
-        }
 
 #ifdef CONFIG_FS_POSIX_ACL
         if (body->eadatasize >= 0 && rce && rce->rce_ops == RMT_LSETFACL) {
index 206cb11..6b0c365 100644 (file)
@@ -73,10 +73,8 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
         int pmode, rc = 0;
         ENTRY;
 
-        body = lustre_msg_buf((*reqp)->rq_repmsg,
-                              DLM_REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_DLM_REP);
         LASSERT(body != NULL);
-        LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF));
 
         if (!(body->valid & OBD_MD_MDS))
                 RETURN(0);
@@ -315,17 +313,15 @@ repeat:
 
         /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
          * to update them for split dir */
-        body = lustre_msg_buf((*reqp)->rq_repmsg,
-                              DLM_REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_DLM_REP);
         LASSERT(body != NULL);
-        LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF));
 
         /* could not find object, FID is not present in response. */
         if (!(body->valid & OBD_MD_FLID))
                 GOTO(out_free_sop_data, rc = 0);
 
         obj = lmv_obj_grab(obd, &body->fid1);
-        if (!obj && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
+        if (!obj && (mea = lmv_get_mea(*reqp))) {
                 /* FIXME: capability for remote! */
                 /* wow! this is split dir, we'd like to handle it */
                 obj = lmv_obj_create(exp, &body->fid1, mea);
@@ -484,10 +480,8 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
 
         LASSERT(*reqp);
         LASSERT((*reqp)->rq_repmsg);
-        body = lustre_msg_buf((*reqp)->rq_repmsg,
-                              DLM_REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL);
-        LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF));
 
         /* could not find object, FID is not present in response. */
         if (!(body->valid & OBD_MD_FLID))
@@ -495,7 +489,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
 
         obj2 = lmv_obj_grab(obd, &body->fid1);
 
-        if (!obj2 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
+        if (!obj2 && (mea = lmv_get_mea(*reqp))) {
 
                 /* FIXME remote capability! */
                 /* wow! this is split dir, we'd like to handle it. */
@@ -553,10 +547,8 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
         if (op_data == NULL)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf((*reqp)->rq_repmsg,
-                              DLM_REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL);
-        LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF));
 
         LASSERT((body->valid & OBD_MD_FLID) != 0);
         obj = lmv_obj_grab(obd, &body->fid1);
@@ -612,10 +604,8 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
 
                 lock->l_ast_data = lmv_obj_get(obj);
 
-                body2 = lustre_msg_buf(req->rq_repmsg,
-                                       DLM_REPLY_REC_OFF, sizeof(*body2));
+                body2 = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body2 != NULL);
-                LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
 
                 obj->lo_inodes[i].li_size = body2->size;
 
@@ -788,12 +778,10 @@ repeat:
         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
                                cb_blocking, extra_lock_flags);
 
-        if (rc == 0 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
+        if (rc == 0 && (mea = lmv_get_mea(*reqp))) {
                 /* Wow! This is split dir, we'd like to handle it. */
-                body = lustre_msg_buf((*reqp)->rq_repmsg,
-                                      DLM_REPLY_REC_OFF, sizeof(*body));
+                body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body != NULL);
-                LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF));
                 LASSERT((body->valid & OBD_MD_FLID) != 0);
 
                 obj = lmv_obj_grab(obd, &body->fid1);
@@ -914,12 +902,10 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                                          * It even got the reply refresh attrs
                                          * from that reply.
                                          */
-                                        body = lustre_msg_buf(mreq->rq_repmsg,
-                                                              DLM_REPLY_REC_OFF,
-                                                              sizeof(*body));
+                                        body = req_capsule_server_get(
+                                                                &mreq->rq_pill,
+                                                                &RMF_MDT_BODY);
                                         LASSERT(body != NULL);
-                                        LASSERT(lustre_rep_swabbed(
-                                                      mreq, DLM_REPLY_REC_OFF));
                                         goto update;
                                 }
                                 /* take already cached attrs into account */
@@ -979,10 +965,8 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         *reqp = req;
                 }
 
-                body = lustre_msg_buf(req->rq_repmsg,
-                                      DLM_REPLY_REC_OFF, sizeof(*body));
+                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body != NULL);
-                LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
 
 update:
                 obj->lo_inodes[i].li_size = body->size;
@@ -1009,10 +993,8 @@ release_lock:
                 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
                        (unsigned long)size);
 
-                body = lustre_msg_buf((*reqp)->rq_repmsg,
-                                      DLM_REPLY_REC_OFF, sizeof(*body));
+                body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body != NULL);
-                LASSERT(lustre_rep_swabbed(*reqp, DLM_REPLY_REC_OFF));
 
                 body->size = size;
 
index d4cce7f..76f4053 100644 (file)
@@ -147,21 +147,20 @@ int lmv_alloc_slave_fids(struct obd_device *obd, struct lu_fid *pid,
                          struct md_op_data *op, struct lu_fid *fid);
 
 static inline struct lmv_stripe_md * 
-lmv_get_mea(struct ptlrpc_request *req, int offset)
+lmv_get_mea(struct ptlrpc_request *req)
 {
        struct mdt_body *body;
        struct lmv_stripe_md *mea;
 
        LASSERT(req);
 
-        body = lustre_msg_buf(req->rq_repmsg, offset, sizeof(*body));
-        LASSERT(lustre_rep_swabbed(req, offset));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 
        if (!body || !S_ISDIR(body->mode) || !body->eadatasize)
                return NULL;
 
-        mea = lustre_msg_buf(req->rq_repmsg, offset + 1,
-                            body->eadatasize);
+        mea = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
+                                           body->eadatasize);
        LASSERT(mea != NULL);
 
        if (mea->mea_count == 0)
index 783f4b6..0db253c 100644 (file)
@@ -1195,10 +1195,9 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
                         RETURN(rc);
                 }
 
-                body = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*body));
+                body = req_capsule_server_get(&(*request)->rq_pill,
+                                              &RMF_MDT_BODY);
                 LASSERT(body != NULL);
-                LASSERT(lustre_rep_swabbed(*request, REPLY_REC_OFF));
 
                 lmv_obj_lock(obj);
 
@@ -1304,7 +1303,7 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
                 GOTO(cleanup, rc);
         }
 
-        rc = md_get_lustre_md(tgt_exp, req, 1, NULL, exp, &md);
+        rc = md_get_lustre_md(tgt_exp, req, NULL, exp, &md);
         if (rc) {
                 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
                 GOTO(cleanup, rc);
@@ -1514,10 +1513,8 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         int rc = 0, pmode;
         ENTRY;
 
-        body = lustre_msg_buf(req->rq_repmsg,
-                              DLM_REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL);
-        LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
 
         if (!(body->valid & OBD_MD_MDS))
                 RETURN(0);
@@ -1659,10 +1656,9 @@ repeat:
         rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid,
                              ea_size, suppgid, request);
         if (rc == 0) {
-                body = lustre_msg_buf((*request)->rq_repmsg,
-                                      REQ_REC_OFF, sizeof(*body));
+                body = req_capsule_server_get(&(*request)->rq_pill,
+                                              &RMF_MDT_BODY);
                 LASSERT(body != NULL);
-                LASSERT(lustre_rep_swabbed(*request, REQ_REC_OFF));
 
                 if (body->valid & OBD_MD_MDS) {
                         struct ptlrpc_request *req = NULL;
@@ -2741,16 +2737,15 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags,
 }
 
 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
-                      int offset, struct obd_export *dt_exp,
-                      struct obd_export *md_exp, struct lustre_md *md)
+                      struct obd_export *dt_exp, struct obd_export *md_exp,
+                      struct lustre_md *md)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         int rc;
 
         ENTRY;
-        rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md_exp,
-                              md);
+        rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, dt_exp, md_exp, md);
         RETURN(rc);
 }
 
index 68c4bca..922eea5 100644 (file)
@@ -328,7 +328,7 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
                         GOTO(cleanup, obj = ERR_PTR(rc));
                 }
 
-                rc = md_get_lustre_md(exp, req, 0, NULL, exp, &md);
+                rc = md_get_lustre_md(exp, req, NULL, exp, &md);
                 if (rc) {
                         CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
                         GOTO(cleanup, obj = ERR_PTR(rc));
index dda6f11..2bf1326 100644 (file)
@@ -35,43 +35,36 @@ static inline void lprocfs_mdc_init_vars(struct lprocfs_static_vars *lvars)
         memset(lvars, 0, sizeof(*lvars));
 }
 #endif
-void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
-                       __u64 valid, const struct lu_fid *fid,
-                       struct obd_capa *oc, int ea_size,
-                       __u32 suppgid, int flags);
-void mdc_pack_capa(struct ptlrpc_request *req, int offset, struct obd_capa *oc);
-void mdc_pack_rep_body(struct ptlrpc_request *);
-void mdc_is_subdir_pack(struct ptlrpc_request *req, int offset,
-                        const struct lu_fid *pfid, const struct lu_fid *cfid,
-                        int flags);
-void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset,
-                     __u32 size, const struct lu_fid *fid,
-                      struct obd_capa *oc);
-void mdc_getattr_pack(struct ptlrpc_request *req, int offset, __u64 valid,
-                      int flags, struct md_op_data *data);
-void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
-                      struct md_op_data *op_data,
-                      void *ea, int ealen, void *ea2, int ea2len);
-void mdc_create_pack(struct ptlrpc_request *req, int offset,
-                     struct md_op_data *op_data, const void *data, int datalen,
-                    __u32 mode, __u32 uid, __u32 gid, __u32 cap_effective,
-                    __u64 rdev);
-void mdc_open_pack(struct ptlrpc_request *req, int offset,
-                   struct md_op_data *op_data, __u32 mode, __u64 rdev,
-                   __u32 flags, const void *data, int datalen);
-void mdc_join_pack(struct ptlrpc_request *req, int offset, 
-                   struct md_op_data *op_data, __u64 head_size);
-void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
-                     struct md_op_data *op_data);
-void mdc_link_pack(struct ptlrpc_request *req, int offset,
-                   struct md_op_data *op_data);
-void mdc_rename_pack(struct ptlrpc_request *req, int offset,
-                     struct md_op_data *op_data,
+void mdc_pack_body(struct ptlrpc_request *req, const struct lu_fid *fid,
+                   struct obd_capa *oc, __u64 valid, int ea_size,
+                   __u32 suppgid, int flags);
+void mdc_pack_capa(struct ptlrpc_request *req, const struct req_msg_field *field,
+                   struct obd_capa *oc);
+int mdc_pack_req(struct ptlrpc_request *req, int version, int opc);
+void mdc_is_subdir_pack(struct ptlrpc_request *req, const struct lu_fid *pfid,
+                        const struct lu_fid *cfid, int flags);
+void mdc_readdir_pack(struct ptlrpc_request *req, __u64 pgoff, __u32 size,
+                      const struct lu_fid *fid, struct obd_capa *oc);
+void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
+                      struct md_op_data *data);
+void mdc_setattr_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                     void *ea, int ealen, void *ea2, int ea2len);
+void mdc_create_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                     const void *data, int datalen, __u32 mode, __u32 uid,
+                     __u32 gid, __u32 cap_effective, __u64 rdev);
+void mdc_open_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                   __u32 mode, __u64 rdev, __u32 flags, const void *data,
+                   int datalen);
+void mdc_join_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                   __u64 head_size);
+void mdc_unlink_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
+void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
+void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
                      const char *old, int oldlen, const char *new, int newlen);
-void mdc_close_pack(struct ptlrpc_request *req, int offset,
-                    struct md_op_data *op_data);
-void mdc_exit_request(struct client_obd *cli);
+void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
 void mdc_enter_request(struct client_obd *cli);
+void mdc_exit_request(struct client_obd *cli);
 
 static inline int client_is_remote(struct obd_export *exp)
 {
@@ -127,8 +120,7 @@ int mdc_open(struct obd_export *exp, obd_id ino, int type, int flags,
 struct obd_client_handle;
 
 int mdc_get_lustre_md(struct obd_export *md_exp, struct ptlrpc_request *req,
-                      int offset, struct obd_export *dt_exp, 
-                      struct obd_export *lmv_exp,
+                      struct obd_export *dt_exp, struct obd_export *lmv_exp,
                       struct lustre_md *md);
 
 int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md);
@@ -157,6 +149,18 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
                       ldlm_policy_data_t *policy, ldlm_mode_t mode,
                       int flags, void *opaque);
+
+static inline void mdc_set_capa_size(struct ptlrpc_request *req,
+                                     const struct req_msg_field *field,
+                                     struct obd_capa *oc)
+{
+        if (oc == NULL)
+                req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
+        else
+                /* it is already calculated as sizeof struct obd_capa */
+                ;
+}
+
 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
                            const struct lu_fid *fid, ldlm_type_t type,
                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
index 66e5ad3..de09e61 100644 (file)
@@ -38,7 +38,7 @@
 #endif
 #endif
 
-static void mdc_pack_body(struct mdt_body *b, __u32 suppgid)
+static void __mdc_pack_body(struct mdt_body *b, __u32 suppgid)
 {
         LASSERT (b != NULL);
 
@@ -50,97 +50,102 @@ static void mdc_pack_body(struct mdt_body *b, __u32 suppgid)
         b->capability = current->cap_effective;
 }
 
-void mdc_pack_capa(struct ptlrpc_request *req, int offset, struct obd_capa *oc)
+void mdc_pack_capa(struct ptlrpc_request *req, const struct req_msg_field *field,
+                   struct obd_capa *oc)
 {
+        struct req_capsule *pill = &req->rq_pill;
         struct lustre_capa *c;
 
         if (oc == NULL) {
-                LASSERT(lustre_msg_buflen(req->rq_reqmsg, offset) == 0);
+                LASSERT(req_capsule_get_size(pill, field, RCL_CLIENT) == 0);
                 return;
         }
 
-        c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
+        c = req_capsule_client_get(pill, field);
         LASSERT(c != NULL);
         capa_cpy(c, oc);
         DEBUG_CAPA(D_SEC, c, "pack");
 }
 
-void mdc_is_subdir_pack(struct ptlrpc_request *req, int offset,
-                        const struct lu_fid *pfid,
+void mdc_is_subdir_pack(struct ptlrpc_request *req, const struct lu_fid *pfid,
                         const struct lu_fid *cfid, int flags)
 {
-        struct mdt_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
+        struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
+                                                    &RMF_MDT_BODY);
 
-        if (pfid)
+        if (pfid) {
                 b->fid1 = *pfid;
+                b->valid = OBD_MD_FLID;
+        }
         if (cfid)
                 b->fid2 = *cfid;
-        b->valid = OBD_MD_FLID;
         b->flags = flags;
 }
 
-void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
-                       __u64 valid, const struct lu_fid *fid,
-                       struct obd_capa *oc, int ea_size, __u32 suppgid,
-                       int flags)
+void mdc_pack_body(struct ptlrpc_request *req,
+                   const struct lu_fid *fid, struct obd_capa *oc,
+                   __u64 valid, int ea_size, __u32 suppgid, int flags)
 {
-        struct mdt_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
-
+        struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
+                                                    &RMF_MDT_BODY);
+        LASSERT(b != NULL);
         b->valid = valid;
         b->eadatasize = ea_size;
         b->flags = flags;
-        mdc_pack_body(b, suppgid);
+        __mdc_pack_body(b, suppgid);
         if (fid) {
                 b->fid1 = *fid;
-                mdc_pack_capa(req, offset + 1, oc);
+                b->valid |= OBD_MD_FLID;
+                mdc_pack_capa(req, &RMF_CAPA1, oc);
         }
 }
 
-void mdc_readdir_pack(struct ptlrpc_request *req, int offset, __u64 pgoff,
+void mdc_readdir_pack(struct ptlrpc_request *req, __u64 pgoff,
                       __u32 size, const struct lu_fid *fid, struct obd_capa *oc)
 {
-        struct mdt_body *b;
-
-        b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
+        struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
+                                                    &RMF_MDT_BODY);
         b->fid1 = *fid;
+        b->valid |= OBD_MD_FLID;
         b->size = pgoff;                       /* !! */
         b->nlink = size;                        /* !! */
-        mdc_pack_body(b, -1);
-        mdc_pack_capa(req, offset + 1, oc);
+        __mdc_pack_body(b, -1);
+        mdc_pack_capa(req, &RMF_CAPA1, oc);
 }
 
 /* packing of MDS records */
-void mdc_create_pack(struct ptlrpc_request *req, int offset,
-                     struct md_op_data *op_data, const void *data, int datalen,
-                     __u32 mode, __u32 uid, __u32 gid, __u32 cap_effective,
-                     __u64 rdev)
+void mdc_create_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                     const void *data, int datalen, __u32 mode,
+                     __u32 uid, __u32 gid, __u32 cap_effective, __u64 rdev)
 {
         struct mdt_rec_create *rec;
-        char *tmp;
-        
-        rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*rec));
-
-        rec->cr_opcode = REINT_CREATE;
-        rec->cr_fsuid = uid;
-        rec->cr_fsgid = gid;
-        rec->cr_cap = cap_effective;
-        rec->cr_fid1 = op_data->op_fid1;
-        rec->cr_fid2 = op_data->op_fid2;
-        rec->cr_mode = mode;
-        rec->cr_rdev = rdev;
-        rec->cr_time = op_data->op_mod_time;
+        char                  *tmp;
+
+        CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_create));
+        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
+
+
+        rec->cr_opcode   = REINT_CREATE;
+        rec->cr_fsuid    = uid;
+        rec->cr_fsgid    = gid;
+        rec->cr_cap      = cap_effective;
+        rec->cr_fid1     = op_data->op_fid1;
+        rec->cr_fid2     = op_data->op_fid2;
+        rec->cr_mode     = mode;
+        rec->cr_rdev     = rdev;
+        rec->cr_time     = op_data->op_mod_time;
         rec->cr_suppgid1 = op_data->op_suppgids[0];
         rec->cr_suppgid2 = op_data->op_suppgids[1];
-        rec->cr_flags = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS;
-        rec->cr_bias = op_data->op_bias;
+        rec->cr_flags    = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS;
+        rec->cr_bias     = op_data->op_bias;
 
-        mdc_pack_capa(req, offset + 1, op_data->op_capa1);
+        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
 
-        tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, op_data->op_namelen + 1);
+        tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
 
         if (data) {
-                tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, datalen);
+                tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
                 memcpy(tmp, data, datalen);
         }
 }
@@ -175,49 +180,51 @@ static __u32 mds_pack_open_flags(__u32 flags)
 }
 
 /* packing of MDS records */
-void mdc_join_pack(struct ptlrpc_request *req, int offset,
-                   struct md_op_data *op_data, __u64 head_size)
+void mdc_join_pack(struct ptlrpc_request *req,
+                   struct md_op_data *op_data,
+                   __u64 head_size)
 {
         struct mdt_rec_join *rec;
 
-        rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*rec));
+        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_JOINFILE);
         LASSERT(rec != NULL);
         rec->jr_fid = op_data->op_fid2;
         rec->jr_headsize = head_size;
 }
 
-void mdc_open_pack(struct ptlrpc_request *req, int offset,
-                   struct md_op_data *op_data, __u32 mode, __u64 rdev,
-                   __u32 flags, const void *lmm, int lmmlen)
+void mdc_open_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                   __u32 mode, __u64 rdev, __u32 flags, const void *lmm,
+                   int lmmlen)
 {
         struct mdt_rec_create *rec;
         char *tmp;
-        rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
+
+        CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_create));
+        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 
         /* XXX do something about time, uid, gid */
-        rec->cr_opcode = REINT_OPEN;
-        rec->cr_fsuid = current->fsuid;
-        rec->cr_fsgid = current->fsgid;
-        rec->cr_cap = current->cap_effective;
+        rec->cr_opcode   = REINT_OPEN;
+        rec->cr_fsuid    = current->fsuid;
+        rec->cr_fsgid    = current->fsgid;
+        rec->cr_cap      = current->cap_effective;
         if (op_data != NULL) {
                 rec->cr_fid1 = op_data->op_fid1;
                 rec->cr_fid2 = op_data->op_fid2;
         }
-        rec->cr_mode = mode;
-        rec->cr_flags = mds_pack_open_flags(flags);
-        rec->cr_time = op_data->op_mod_time;
-        rec->cr_rdev = rdev;
+        rec->cr_mode     = mode;
+        rec->cr_flags    = mds_pack_open_flags(flags);
+        rec->cr_rdev     = rdev;
+        rec->cr_time     = op_data->op_mod_time;
         rec->cr_suppgid1 = op_data->op_suppgids[0];
         rec->cr_suppgid2 = op_data->op_suppgids[1];
-        rec->cr_bias = op_data->op_bias;
+        rec->cr_bias     = op_data->op_bias;
 
-        mdc_pack_capa(req, offset + 1, op_data->op_capa1);
+        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
         /* the next buffer is child capa, which is used for replay,
          * will be packed from the data in reply message. */
 
         if (op_data->op_name) {
-                tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3,
-                                     op_data->op_namelen + 1);
+                tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 LOGL0(op_data->op_name, op_data->op_namelen, tmp);
         }
 
@@ -227,7 +234,7 @@ void mdc_open_pack(struct ptlrpc_request *req, int offset,
                 /*XXX a hack for liblustre to set EA (LL_IOC_LOV_SETSTRIPE) */
                 rec->cr_fid2 = op_data->op_fid2;
 #endif
-                tmp = lustre_msg_buf(req->rq_reqmsg, offset + 4, lmmlen);
+                tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
                 memcpy (tmp, lmm, lmmlen);
         }
 }
@@ -276,22 +283,22 @@ static inline __u64 attr_pack(unsigned int ia_valid) {
 static void mdc_setattr_pack_rec(struct mdt_rec_setattr *rec,
                                  struct md_op_data *op_data)
 {
-        rec->sa_opcode = REINT_SETATTR;
-        rec->sa_fsuid = current->fsuid;
-        rec->sa_fsgid = current->fsgid;
-        rec->sa_cap = current->cap_effective;
+        rec->sa_opcode  = REINT_SETATTR;
+        rec->sa_fsuid   = current->fsuid;
+        rec->sa_fsgid   = current->fsgid;
+        rec->sa_cap     = current->cap_effective;
         rec->sa_suppgid = -1;
 
-        rec->sa_fid = op_data->op_fid1;
-        rec->sa_valid = attr_pack(op_data->op_attr.ia_valid);
-        rec->sa_mode = op_data->op_attr.ia_mode;
-        rec->sa_uid = op_data->op_attr.ia_uid;
-        rec->sa_gid = op_data->op_attr.ia_gid;
-        rec->sa_size = op_data->op_attr.ia_size;
+        rec->sa_fid    = op_data->op_fid1;
+        rec->sa_valid  = attr_pack(op_data->op_attr.ia_valid);
+        rec->sa_mode   = op_data->op_attr.ia_mode;
+        rec->sa_uid    = op_data->op_attr.ia_uid;
+        rec->sa_gid    = op_data->op_attr.ia_gid;
+        rec->sa_size   = op_data->op_attr.ia_size;
         rec->sa_blocks = op_data->op_attr_blocks;
-        rec->sa_atime = LTIME_S(op_data->op_attr.ia_atime);
-        rec->sa_mtime = LTIME_S(op_data->op_attr.ia_mtime);
-        rec->sa_ctime = LTIME_S(op_data->op_attr.ia_ctime);
+        rec->sa_atime  = LTIME_S(op_data->op_attr.ia_atime);
+        rec->sa_mtime  = LTIME_S(op_data->op_attr.ia_mtime);
+        rec->sa_ctime  = LTIME_S(op_data->op_attr.ia_ctime);
         rec->sa_attr_flags = ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags;
         if ((op_data->op_attr.ia_valid & ATTR_GID) &&
             in_group_p(op_data->op_attr.ia_gid))
@@ -307,127 +314,129 @@ static void mdc_epoch_pack(struct mdt_epoch *epoch, struct md_op_data *op_data)
         epoch->flags = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS;
 }
 
-void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
-                      struct md_op_data *op_data, void *ea,
-                      int ealen, void *ea2, int ea2len)
+void mdc_setattr_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                      void *ea, int ealen, void *ea2, int ea2len)
 {
         struct mdt_rec_setattr *rec;
         struct mdt_epoch *epoch;
         
-        rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));        
+        CLASSERT(sizeof(struct mdt_rec_reint) ==sizeof(struct mdt_rec_setattr));
+        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
         mdc_setattr_pack_rec(rec, op_data);
 
-        mdc_pack_capa(req, offset + 1, op_data->op_capa1);
+        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
 
         if (op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) {
-                epoch = lustre_msg_buf(req->rq_reqmsg, offset + 2,
-                                        sizeof(*epoch));
+                epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
                 mdc_epoch_pack(epoch, op_data);
         }
 
         if (ealen == 0)
                 return;
 
-        memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 3, ealen), ea, ealen);
+        memcpy(req_capsule_client_get(&req->rq_pill, &RMF_EADATA), ea, ealen);
 
         if (ea2len == 0)
                 return;
 
-        memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 4, ea2len), ea2, ea2len);
+        memcpy(req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES), ea2,
+               ea2len);
 }
 
-void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
-                     struct md_op_data *op_data)
+void mdc_unlink_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
 {
         struct mdt_rec_unlink *rec;
         char *tmp;
-
-        rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
+        CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_unlink));
+        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
         LASSERT (rec != NULL);
 
-        rec->ul_opcode = REINT_UNLINK;
-        rec->ul_fsuid = op_data->op_fsuid;//current->fsuid;
-        rec->ul_fsgid = op_data->op_fsgid;//current->fsgid;
-        rec->ul_cap = op_data->op_cap;//current->cap_effective;
-        rec->ul_mode = op_data->op_mode;
-        rec->ul_suppgid = op_data->op_suppgids[0];
-        rec->ul_fid1 = op_data->op_fid1;
-        rec->ul_fid2 = op_data->op_fid2;
-        rec->ul_time = op_data->op_mod_time;
-        rec->ul_bias = op_data->op_bias;
-
-        mdc_pack_capa(req, offset + 1, op_data->op_capa1);
-
-        tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, op_data->op_namelen + 1);
+        rec->ul_opcode  = REINT_UNLINK;
+        rec->ul_fsuid   = op_data->op_fsuid;
+        rec->ul_fsgid   = op_data->op_fsgid;
+        rec->ul_cap     = op_data->op_cap;
+        rec->ul_mode    = op_data->op_mode;
+        rec->ul_suppgid1= op_data->op_suppgids[0];
+        rec->ul_suppgid2= -1;
+        rec->ul_fid1    = op_data->op_fid1;
+        rec->ul_fid2    = op_data->op_fid2;
+        rec->ul_time    = op_data->op_mod_time;
+        rec->ul_bias    = op_data->op_bias;
+
+        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
+
+        tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
         LASSERT(tmp != NULL);
         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
 }
 
-void mdc_link_pack(struct ptlrpc_request *req, int offset,
-                   struct md_op_data *op_data)
+void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
 {
         struct mdt_rec_link *rec;
         char *tmp;
 
-        rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
+        CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_link));
+        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
+        LASSERT (rec != NULL);
 
-        rec->lk_opcode = REINT_LINK;
-        rec->lk_fsuid = op_data->op_fsuid;//current->fsuid;
-        rec->lk_fsgid = op_data->op_fsgid;//current->fsgid;
-        rec->lk_cap = op_data->op_cap;//current->cap_effective;
+        rec->lk_opcode   = REINT_LINK;
+        rec->lk_fsuid    = op_data->op_fsuid;//current->fsuid;
+        rec->lk_fsgid    = op_data->op_fsgid;//current->fsgid;
+        rec->lk_cap      = op_data->op_cap;//current->cap_effective;
         rec->lk_suppgid1 = op_data->op_suppgids[0];
         rec->lk_suppgid2 = op_data->op_suppgids[1];
-        rec->lk_fid1 = op_data->op_fid1;
-        rec->lk_fid2 = op_data->op_fid2;
-        rec->lk_time = op_data->op_mod_time;
-        rec->lk_bias = op_data->op_bias;
+        rec->lk_fid1     = op_data->op_fid1;
+        rec->lk_fid2     = op_data->op_fid2;
+        rec->lk_time     = op_data->op_mod_time;
+        rec->lk_bias     = op_data->op_bias;
 
-        mdc_pack_capa(req, offset + 1, op_data->op_capa1);
-        mdc_pack_capa(req, offset + 2, op_data->op_capa2);
+        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
+        mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
 
-        tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, op_data->op_namelen + 1);
+        tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
         LOGL0(op_data->op_name, op_data->op_namelen, tmp);
 }
 
-void mdc_rename_pack(struct ptlrpc_request *req, int offset,
-                     struct md_op_data *op_data,
+void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
                      const char *old, int oldlen, const char *new, int newlen)
 {
         struct mdt_rec_rename *rec;
         char *tmp;
 
-        rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec));
+        CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_rename));
+        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 
         /* XXX do something about time, uid, gid */
-        rec->rn_opcode = REINT_RENAME;
-        rec->rn_fsuid = op_data->op_fsuid;//current->fsuid;
-        rec->rn_fsgid = op_data->op_fsgid;//current->fsgid;
-        rec->rn_cap = op_data->op_cap;//current->cap_effective;
+        rec->rn_opcode   = REINT_RENAME;
+        rec->rn_fsuid    = op_data->op_fsuid;
+        rec->rn_fsgid    = op_data->op_fsgid;
+        rec->rn_cap      = op_data->op_cap;
         rec->rn_suppgid1 = op_data->op_suppgids[0];
         rec->rn_suppgid2 = op_data->op_suppgids[1];
-        rec->rn_fid1 = op_data->op_fid1;
-        rec->rn_fid2 = op_data->op_fid2;
-        rec->rn_time = op_data->op_mod_time;
-        rec->rn_mode = op_data->op_mode;
-        rec->rn_bias = op_data->op_bias;
+        rec->rn_fid1     = op_data->op_fid1;
+        rec->rn_fid2     = op_data->op_fid2;
+        rec->rn_time     = op_data->op_mod_time;
+        rec->rn_mode     = op_data->op_mode;
+        rec->rn_bias     = op_data->op_bias;
 
-        mdc_pack_capa(req, offset + 1, op_data->op_capa1);
-        mdc_pack_capa(req, offset + 2, op_data->op_capa2);
+        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
+        mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
 
-        tmp = lustre_msg_buf(req->rq_reqmsg, offset + 3, oldlen + 1);
+        tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
         LOGL0(old, oldlen, tmp);
 
         if (new) {
-                tmp = lustre_msg_buf(req->rq_reqmsg, offset + 4, newlen + 1);
+                tmp = req_capsule_client_get(&req->rq_pill, &RMF_SYMTGT);
                 LOGL0(new, newlen, tmp);
         }
 }
 
-void mdc_getattr_pack(struct ptlrpc_request *req, int offset, __u64 valid,
-                      int flags, struct md_op_data *op_data)
+void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
+                      struct md_op_data *op_data)
 {
-        struct mdt_body *b;
-        b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*b));
+        struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
+                                                    &RMF_MDT_BODY);
 
         b->fsuid = current->fsuid;
         b->fsgid = current->fsgid;
@@ -442,28 +451,26 @@ void mdc_getattr_pack(struct ptlrpc_request *req, int offset, __u64 valid,
 
         b->fid1 = op_data->op_fid1;
         b->fid2 = op_data->op_fid2;
+        b->valid |= OBD_MD_FLID;
 
-        mdc_pack_capa(req, offset + 1, op_data->op_capa1);
+        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
 
         if (op_data->op_name) {
-                char *tmp;
-                tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2,
-                                     op_data->op_namelen + 1);
+                char *tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 LOGL0(op_data->op_name, op_data->op_namelen, tmp);
         }
 }
 
-void mdc_close_pack(struct ptlrpc_request *req, int offset,
-                    struct md_op_data *op_data)
+void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
 {
         struct mdt_epoch *epoch;
         struct mdt_rec_setattr *rec;
 
-        epoch = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*epoch));
-        rec = lustre_msg_buf(req->rq_reqmsg, offset + 1, sizeof(*rec));
+        epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
+        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 
         mdc_setattr_pack_rec(rec, op_data);
-        mdc_pack_capa(req, offset + 2, op_data->op_capa1);
+        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
         mdc_epoch_pack(epoch, op_data);
 }
 
index 73b3d73..70aca4a 100644 (file)
@@ -226,11 +226,11 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
  * but this is incredibly unlikely, and questionable whether the client
  * could do MDS recovery under OOM anyways... */
 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
-                                struct mdt_body *body, int size[9])
+                               struct mdt_body *body)
 {
         int     rc;
-        ENTRY;
 
+        /* FIXME: remove this explicit offset. */
         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
                                         body->eadatasize);
         if (rc) {
@@ -239,196 +239,270 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
                 body->valid &= ~OBD_MD_FLEASIZE;
                 body->eadatasize = 0;
         }
-        EXIT;
 }
 
-/* We always reserve enough space in the reply packet for a stripe MD, because
- * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
-                struct lookup_intent *it, struct md_op_data *op_data,
-                struct lustre_handle *lockh, void *lmm, int lmmsize,
-                int extra_lock_flags)
+static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
+                                                   struct lookup_intent *it,
+                                                   struct md_op_data *op_data,
+                                                   void *lmm, int lmmsize,
+                                                   void *cb_data)
 {
         struct ptlrpc_request *req;
-        struct obd_device *obddev = class_exp2obd(exp);
-        struct ldlm_res_id res_id =
-                { .name = {fid_seq(&op_data->op_fid1),
-                           fid_oid(&op_data->op_fid1),
-                           fid_ver(&op_data->op_fid1)} };
-        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
-        struct ldlm_request *lockreq;
-        struct ldlm_intent *lit;
-        struct ldlm_reply *lockrep;
-        int size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*lockreq),
-                        [DLM_INTENT_IT_OFF]   = sizeof(*lit),
-                        0, 0, 0, 0, 0, 0 };
-        int repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                           [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
-                           [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
-                           [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
-                                                   cl_max_mds_easize,
-                           0, 0, 0 };
-        int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
-        int repbufcnt = 4, rc;
+        struct obd_device     *obddev = class_exp2obd(exp);
+        struct ldlm_intent    *lit;
+        int                    joinfile = !!((it->it_flags & O_JOIN_FILE) && 
+                                              op_data->op_data);
+        CFS_LIST_HEAD(cancels);
+        int                    count = 0;
+        int                    mode;
+        int                    rc;
         ENTRY;
 
-        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
+        it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
 
-        if (it->it_op & IT_OPEN) {
-                int do_join = (!!(it->it_flags & O_JOIN_FILE) && 
-                              op_data->op_data);
-                CFS_LIST_HEAD(cancels);
-                int count = 0;
-                int mode;
-
-                it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
-
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
-                /* parent capability */
-                size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ?
-                                               sizeof(struct lustre_capa) : 0;
-                /* child capability, used for replay only */
-                size[DLM_INTENT_REC_OFF + 2] = sizeof(struct lustre_capa);
-                size[DLM_INTENT_REC_OFF + 3] = op_data->op_namelen + 1;
-                /* As an optimization, we allocate an RPC request buffer for
-                 * at least a default-sized LOV EA even if we aren't sending
-                 * one.
-                 */
-                size[DLM_INTENT_REC_OFF + 4] = max(lmmsize,
-                                           obddev->u.cli.cl_default_mds_easize);
-
-                /* XXX: openlock is not cancelled for cross-refs. */
-                /* If inode is known, cancel conflicting OPEN locks. */
-                if (fid_is_sane(&op_data->op_fid2)) {
-                        if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
-                                mode = LCK_CW;
+        /* XXX: openlock is not cancelled for cross-refs. */
+        /* If inode is known, cancel conflicting OPEN locks. */
+        if (fid_is_sane(&op_data->op_fid2)) {
+                if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
+                        mode = LCK_CW;
 #ifdef FMODE_EXEC
-                        else if (it->it_flags & FMODE_EXEC)
-                                mode = LCK_PR;
+                else if (it->it_flags & FMODE_EXEC)
+                        mode = LCK_PR;
 #endif
-                        else 
-                                mode = LCK_CR;
-                        count = mdc_resource_get_unused(exp, &op_data->op_fid2,
-                                                        &cancels, mode,
-                                                        MDS_INODELOCK_OPEN);
-                }
-
-                /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
-                if (it->it_op & IT_CREAT || do_join)
-                        mode = LCK_EX;
                 else
                         mode = LCK_CR;
-                count += mdc_resource_get_unused(exp, &op_data->op_fid1,
-                                                 &cancels, mode,
-                                                 MDS_INODELOCK_UPDATE);
+                count = mdc_resource_get_unused(exp, &op_data->op_fid2,
+                                                &cancels, mode,
+                                                MDS_INODELOCK_OPEN);
+        }
 
-                if (do_join)
-                        size[DLM_INTENT_REC_OFF + 5] =
-                                                sizeof(struct mdt_rec_join);
-                else
-                        it->it_flags &= ~O_JOIN_FILE;
+        /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
+        if (it->it_op & IT_CREAT || joinfile)
+                mode = LCK_EX;
+        else
+                mode = LCK_CR;
+        count += mdc_resource_get_unused(exp, &op_data->op_fid1,
+                                         &cancels, mode,
+                                         MDS_INODELOCK_UPDATE);
+
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_LDLM_INTENT_OPEN);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                RETURN(ERR_PTR(-ENOMEM));
+        }
 
-                req = ldlm_prep_enqueue_req(exp, 8 + do_join, size, &cancels,
-                                            count);
-                if (!req)
-                        RETURN(-ENOMEM);
+        /* parent capability */
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        /* child capability, reserve the size according to parent capa, it will
+         * be filled after we get the reply */
+        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
+
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
+                             max(lmmsize, obddev->u.cli.cl_default_mds_easize));
+        if (!joinfile) {
+                req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
+                                     RCL_CLIENT, 0);
+        }
 
-                if (do_join) {
-                        /* join is like an unlink of the tail */
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                               mdc_join_pack(req, DLM_INTENT_REC_OFF + 5, op_data,
-                                      (*(__u64 *)op_data->op_data));
-                }
+        if (exp_connect_cancelset(exp) && count) {
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, LDLM_ENQUEUE));
+        }
 
-                spin_lock(&req->rq_lock);
-                req->rq_replay = 1;
-                spin_unlock(&req->rq_lock);
+        rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        if (rc) {
+                ptlrpc_request_free(req);
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                return NULL;
+        }
+        if (exp_connect_cancelset(exp) && req)
+                ldlm_cli_cancel_list(&cancels, count, req, 0);
 
-                /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
-                                     sizeof(*lit));
-                lit->opc = (__u64)it->it_op;
-
-                /* pack the intended request */
-                mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
-                              it->it_create_mode, 0, it->it_flags,
-                              lmm, lmmsize);
-
-                /* for remote client, fetch remote perm for current user */
-                repsize[repbufcnt++] = client_is_remote(exp) ?
-                                       sizeof(struct mdt_remote_perm) :
-                                       LUSTRE_POSIX_ACL_MAX_SIZE;
-                repsize[repbufcnt++] = sizeof(struct lustre_capa);
-                repsize[repbufcnt++] = sizeof(struct lustre_capa);
-        } else if (it->it_op & IT_UNLINK) {
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_unlink);
-                size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ?
-                                               sizeof(struct lustre_capa) : 0;
-                size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1;
-                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
-                                     sizeof(*lit));
-                lit->opc = (__u64)it->it_op;
-
-                /* pack the intended request */
-                mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
-
-                repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
-        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
-                obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
-                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
-                        OBD_MD_FLMDSCAPA | OBD_MD_MEA;
-                valid |= client_is_remote(exp) ? OBD_MD_FLRMTPERM :
-                                                 OBD_MD_FLACL;
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_body);
-                size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ?
-                                               sizeof(struct lustre_capa) : 0;
-                size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1;
-
-                if (it->it_op & IT_GETATTR)
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+        if (joinfile) {
+                __u64 head_size = *(__u64 *)op_data->op_data;
+                mdc_join_pack(req, op_data, head_size);
+        }
+
+        spin_lock(&req->rq_lock);
+        req->rq_replay = 1;
+        spin_unlock(&req->rq_lock);
+
+        /* pack the intent */
+        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+        lit->opc = (__u64)it->it_op;
+
+        /* pack the intended request */
+        mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
+                      lmmsize);
+
+        /* for remote client, fetch remote perm for current user */
+        if (client_is_remote(exp))
+                req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+                                     sizeof(struct mdt_remote_perm));
+        ptlrpc_request_set_replen(req);
+        return req;
+}
+
+static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
+                                                     struct lookup_intent *it,
+                                                     struct md_op_data *op_data)
+{
+        struct ptlrpc_request *req;
+        struct obd_device     *obddev = class_exp2obd(exp);
+        struct ldlm_intent    *lit;
+        int                    rc;
+        ENTRY;
+
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_LDLM_INTENT_UNLINK);
+        if (req == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
+
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+
+        rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        if (rc) {
+                ptlrpc_request_free(req);
+                RETURN(ERR_PTR(rc));
+        }
+
+        /* pack the intent */
+        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+        lit->opc = (__u64)it->it_op;
+
+        /* pack the intended request */
+        mdc_unlink_pack(req, op_data);
+
+        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                             obddev->u.cli.cl_max_mds_easize);
+        req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+                             obddev->u.cli.cl_max_mds_cookiesize);
+        ptlrpc_request_set_replen(req);
+        RETURN(req);
+}
 
-                req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
+static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
+                                                      struct lookup_intent *it,
+                                                     struct md_op_data *op_data)
+{
+        struct ptlrpc_request *req;
+        struct obd_device     *obddev = class_exp2obd(exp);
+        obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
+                                       OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
+                                       OBD_MD_FLMDSCAPA | OBD_MD_MEA |
+                                       (client_is_remote(exp) ?
+                                               OBD_MD_FLRMTPERM : OBD_MD_FLACL);
+        struct ldlm_intent    *lit;
+        int                    rc;
+        ENTRY;
 
-                /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
-                                     sizeof(*lit));
-                lit->opc = (__u64)it->it_op;
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_LDLM_INTENT_GETATTR);
+        if (req == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
 
-                /* pack the intended request */
-                mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
-                                 it->it_flags, op_data);
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+
+        rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        if (rc) {
+                ptlrpc_request_free(req);
+                RETURN(ERR_PTR(rc));
+        }
 
-                repsize[repbufcnt++] = client_is_remote(exp) ?
-                                       sizeof(struct mdt_remote_perm) :
-                                       LUSTRE_POSIX_ACL_MAX_SIZE;
-                repsize[repbufcnt++] = sizeof(struct lustre_capa);
-        } else if (it->it_op == IT_READDIR) {
+        /* pack the intent */
+        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+        lit->opc = (__u64)it->it_op;
+
+        /* pack the intended request */
+        mdc_getattr_pack(req, valid, it->it_flags, op_data);
+
+        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                             obddev->u.cli.cl_max_mds_easize);
+        if (client_is_remote(exp))
+                req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+                                     sizeof(struct mdt_remote_perm));
+        ptlrpc_request_set_replen(req);
+        RETURN(req);
+}
+
+static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
+{
+        struct ptlrpc_request *req;
+        ENTRY;
+
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                        &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION,
+                                        LDLM_ENQUEUE);
+        if (req == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
+
+        ptlrpc_request_set_replen(req);
+        RETURN(req);
+}
+
+/* We always reserve enough space in the reply packet for a stripe MD, because
+ * we don't know in advance the file type. */
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct md_op_data *op_data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+                int extra_lock_flags)
+{
+        struct obd_device     *obddev = class_exp2obd(exp);
+        struct ptlrpc_request *req;
+        struct req_capsule    *pill;
+        struct ldlm_request   *lockreq;
+        struct ldlm_reply     *lockrep;
+        int                    flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+        int                    rc;
+        struct ldlm_res_id res_id =
+                { .name = {fid_seq(&op_data->op_fid1),
+                           fid_oid(&op_data->op_fid1),
+                           fid_ver(&op_data->op_fid1)} };
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
+        ENTRY;
+
+        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
+
+        if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
 
-                repbufcnt = 2;
-        } else {
+        if (it->it_op & IT_OPEN) {
+                int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
+                                              op_data->op_data);
+
+                req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
+                                           einfo->ei_cbdata);
+                if (!joinfile) {
+                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                        einfo->ei_cbdata = NULL;
+                        lmm = NULL;
+                } else
+                        it->it_flags &= ~O_JOIN_FILE;
+        } else if (it->it_op & IT_UNLINK)
+                req = mdc_intent_unlink_pack(exp, it, op_data);
+        else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
+                req = mdc_intent_getattr_pack(exp, it, op_data);
+        else if (it->it_op == IT_READDIR)
+                req = ldlm_enqueue_pack(exp);
+        else {
                 LBUG();
                 RETURN(-EINVAL);
         }
 
-        /* get ready for the reply */
-        ptlrpc_req_set_repsize(req, repbufcnt, repsize);
+        if (IS_ERR(req))
+                RETURN(PTR_ERR(req));
+        pill = &req->rq_pill;
 
-         /* It is important to obtain rpc_lock first (if applicable), so that
-          * threads that are serialised with rpc_lock are not polluting our
-          * rpcs in flight counter */
+        /* It is important to obtain rpc_lock first (if applicable), so that
+         * threads that are serialised with rpc_lock are not polluting our
+         * rpcs in flight counter */
         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
         mdc_enter_request(&obddev->u.cli);
         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
@@ -439,8 +513,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         /* Similarly, if we're going to replay this request, we don't want to
          * actually get a lock, just perform the intent. */
         if (req->rq_transno || req->rq_replay) {
-                lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
-                                         sizeof(*lockreq));
+                lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
         }
 
@@ -468,11 +541,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 LDLM_LOCK_PUT(lock);
         }
 
-        lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
-                                 sizeof(*lockrep));
+        lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
-        /* swabbed by ldlm_cli_enqueue() */
-        LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
 
         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
@@ -497,13 +567,10 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
 
         /* We know what to expect, so we do any byte flipping required here */
-        LASSERT(repbufcnt == 7 || repbufcnt == 6 || repbufcnt == 2);
-        if (repbufcnt >= 6) {
-                int reply_off = DLM_REPLY_REC_OFF;
+        if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
                 struct mdt_body *body;
 
-                body = lustre_swab_repbuf(req, reply_off++, sizeof(*body),
-                                         lustre_swab_mdt_body);
+                body = req_capsule_server_get(pill, &RMF_MDT_BODY);
                 if (body == NULL) {
                         CERROR ("Can't swab mdt_body\n");
                         RETURN (-EPROTO);
@@ -527,12 +594,11 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                          * The eadata is opaque; just check that it is there.
                          * Eventually, obd_unpackmd() will check the contents.
                          */
-                        eadata = lustre_swab_repbuf(req, reply_off++,
-                                                    body->eadatasize, NULL);
-                        if (eadata == NULL) {
-                                CERROR("Missing/short eadata\n");
+                        eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
+                                                              body->eadatasize);
+                        if (eadata == NULL)
                                 RETURN(-EPROTO);
-                        }
+
                         if (body->valid & OBD_MD_FLMODEASIZE) {
                                 if (obddev->u.cli.cl_max_mds_easize <
                                     body->max_mdsize) {
@@ -560,46 +626,40 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                          * (for example error one).
                          */
                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
-                                if (lustre_msg_buflen(req->rq_reqmsg,
-                                                      DLM_INTENT_REC_OFF + 4) <
-                                    body->eadatasize)
-                                        mdc_realloc_openmsg(req, body, size);
-
-                                lmm = lustre_msg_buf(req->rq_reqmsg,
-                                                     DLM_INTENT_REC_OFF + 4,
-                                                     body->eadatasize);
+                                if (req_capsule_get_size(pill, &RMF_EADATA,
+                                                         RCL_CLIENT) <
+                                    body->eadatasize) {
+                                        mdc_realloc_openmsg(req, body);
+                                        req_capsule_set_size(pill, &RMF_EADATA,
+                                                             RCL_CLIENT,
+                                                             body->eadatasize);
+                                }
+                                lmm = req_capsule_client_get(pill, &RMF_EADATA);
                                 if (lmm)
                                         memcpy(lmm, eadata, body->eadatasize);
                         }
                 }
+
                 if (body->valid & OBD_MD_FLRMTPERM) {
                         struct mdt_remote_perm *perm;
 
                         LASSERT(client_is_remote(exp));
-                        perm = lustre_swab_repbuf(req, reply_off++,
-                                                  sizeof(*perm),
-                                                  lustre_swab_mdt_remote_perm);
-                        if (perm == NULL) {
-                                CERROR("missing remote permission!\n");
+                        perm = req_capsule_server_get(pill, &RMF_ACL);
+                        if (perm == NULL)
                                 RETURN(-EPROTO);
-                        }
-                } else if ((body->valid & OBD_MD_FLACL) && body->aclsize) {
-                        reply_off++;
+
+                        lustre_swab_mdt_remote_perm(perm);
                 }
                 if (body->valid & OBD_MD_FLMDSCAPA) {
                         struct lustre_capa *capa, *p;
 
-                        capa = lustre_unpack_capa(req->rq_repmsg, reply_off++);
-                        if (capa == NULL) {
-                                CERROR("Missing/short MDS capability\n");
+                        capa = req_capsule_server_get(pill, &RMF_CAPA1);
+                        if (capa == NULL)
                                 RETURN(-EPROTO);
-                        }
 
                         if (it->it_op & IT_OPEN) {
                                 /* client fid capa will be checked in replay */
-                                p = lustre_msg_buf(req->rq_reqmsg,
-                                                   DLM_INTENT_REC_OFF + 2,
-                                                   sizeof(*p));
+                                p = req_capsule_client_get(pill, &RMF_CAPA2);
                                 LASSERT(p);
                                 *p = *capa;
                         }
@@ -607,11 +667,9 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 if (body->valid & OBD_MD_FLOSSCAPA) {
                         struct lustre_capa *capa;
 
-                        capa = lustre_unpack_capa(req->rq_repmsg, reply_off++);
-                        if (capa == NULL) {
-                                CERROR("Missing/short OSS capability\n");
+                        capa = req_capsule_server_get(pill, &RMF_CAPA2);
+                        if (capa == NULL)
                                 RETURN(-EPROTO);
-                        }
                 }
         }
 
@@ -748,17 +806,14 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
         if (rc)
                 RETURN(rc);
 
-        mdt_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
-                                  sizeof(*mdt_body));
-        /* mdc_enqueue checked */
-        LASSERT(mdt_body != NULL);
-        /* mdc_enqueue swabbed */
-        LASSERT(lustre_rep_swabbed(request, 1));
+        mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
+        LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
 
         /* If we were revalidating a fid/name pair, mark the intent in
          * case we fail and get called again from lookup */
-        if (fid_is_sane(&op_data->op_fid2) && (it->it_flags & O_CHECK_STALE) &&
-            (it->it_op != IT_GETATTR)) {
+        if (fid_is_sane(&op_data->op_fid2) &&
+            (it->it_flags & O_CHECK_STALE) &&
+            it->it_op != IT_GETATTR) {
                 it_set_disposition(it, DISP_ENQ_COMPLETE);
 
                 /* Also: did we find the same inode? */
index ee27032..9a81e2c 100644 (file)
@@ -43,7 +43,8 @@
 
 /* mdc_setattr does its own semaphore handling */
 static int mdc_reint(struct ptlrpc_request *request,
-                     struct mdc_rpc_lock *rpc_lock, int level)
+                     struct mdc_rpc_lock *rpc_lock,
+                     int level)
 {
         int rc;
 
@@ -54,10 +55,7 @@ static int mdc_reint(struct ptlrpc_request *request,
         mdc_put_rpc_lock(rpc_lock, NULL);
         if (rc)
                 CDEBUG(D_INFO, "error in handling %d\n", rc);
-        else if (!lustre_swab_repbuf(request, REPLY_REC_OFF,
-                                     sizeof(struct mdt_body),
-                                     lustre_swab_mdt_body)) {
-                CERROR ("Can't unpack mdt_body\n");
+        else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
                 rc = -EPROTO;
         }
         return rc;
@@ -90,12 +88,11 @@ int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
         RETURN(count);
 }
 
-struct ptlrpc_request *mdc_prep_elc_req(struct obd_export *exp,
-                                        int bufcount, int *size, int off,
-                                        struct list_head *cancels, int count)
+static int mdc_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
+                            struct list_head *cancels, int count)
 {
-        return ldlm_prep_elc_req(exp, LUSTRE_MDS_VERSION, MDS_REINT,
-                                 bufcount, size, off, 0, cancels, count);
+        return ldlm_prep_elc_req(exp, req, LUSTRE_MDS_VERSION, MDS_REINT,
+                                 0, cancels, count);
 }
 
 /* If mdc_setattr is called with an 'iattr', then it is a normal RPC that
@@ -110,29 +107,14 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
 {
         CFS_LIST_HEAD(cancels);
         struct ptlrpc_request *req;
-        struct mdt_rec_setattr *rec;
         struct mdc_rpc_lock *rpc_lock;
         struct obd_device *obd = exp->exp_obd;
-        int size[7] = { sizeof(struct ptlrpc_body),
-                        sizeof(*rec), 0, 0, ealen, ea2len, 0 };
-        int count = 0, bufcount = 4, rc;
+        int count = 0, rc;
         __u64 bits;
         ENTRY;
 
         LASSERT(op_data != NULL);
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                sizeof(struct lustre_capa) : 0;
-
-        if (op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN))
-                size[REQ_REC_OFF + 2] = sizeof(struct mdt_epoch);
-
-        if (ealen > 0) {
-                bufcount++;
-                if (ea2len > 0)
-                        bufcount++;
-        }
-
         bits = MDS_INODELOCK_UPDATE;
         if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
                 bits |= MDS_INODELOCK_LOOKUP;
@@ -140,13 +122,26 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                 &cancels, LCK_EX, bits);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 7;
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 5, &cancels, count);
-
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_REINT_SETATTR);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
+        }
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        if ((op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) == 0)
+                req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT,
+                                     0);
+        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
+        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
+                             ea2len);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
         if (op_data->op_attr.ia_valid & ATTR_FROM_OPEN) {
                 req->rq_request_portal = MDS_SETATTR_PORTAL; //XXX FIXME bug 249
@@ -159,11 +154,9 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
                        LTIME_S(op_data->op_attr.ia_mtime),
                        LTIME_S(op_data->op_attr.ia_ctime));
-        mdc_setattr_pack(req, REQ_REC_OFF, op_data, ea, ealen, ea2, ea2len);
+        mdc_setattr_pack(req, op_data, ea, ealen, ea2, ea2len);
 
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        size[REPLY_REC_OFF + 1] = sizeof(struct lustre_capa);
-        ptlrpc_req_set_repsize(req, 3, size);
+        ptlrpc_request_set_replen(req);
         if (mod && (op_data->op_flags & MF_EPOCH_OPEN) &&
             req->rq_import->imp_replayable)
         {
@@ -201,12 +194,8 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                const void *data, int datalen, int mode, __u32 uid, __u32 gid,
                __u32 cap_effective, __u64 rdev, struct ptlrpc_request **request)
 {
-        int size[6] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_rec_create),
-                        0, op_data->op_namelen + 1, 0, 0 };
-        struct obd_device *obd = exp->exp_obd;
-        int level, bufcount = 4, rc;
         struct ptlrpc_request *req;
+        int level, rc;
         int count = 0;
         CFS_LIST_HEAD(cancels);
         ENTRY;
@@ -224,41 +213,43 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                 }
         }
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                sizeof(struct lustre_capa) : 0;
-        
-        if (data && datalen) {
-                size[bufcount] = datalen;
-                bufcount++;
-        }
-
         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && 
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                 &cancels, LCK_EX,
                                                 MDS_INODELOCK_UPDATE);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 6;
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 4, &cancels, count);
 
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_REINT_CREATE_RMT_ACL);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
+        }
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
+                             data && datalen ? datalen : 0);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
         /*
          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
          * tgt, for symlinks or lov MD data.
          */
-        mdc_create_pack(req, REQ_REC_OFF, op_data, data, datalen, mode, uid,
+        mdc_create_pack(req, op_data, data, datalen, mode, uid,
                         gid, cap_effective, rdev);
 
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        size[REPLY_REC_OFF + 1] = sizeof(struct lustre_capa);
-        ptlrpc_req_set_repsize(req, 3, size);
+        ptlrpc_request_set_replen(req);
 
         level = LUSTRE_IMP_FULL;
  resend:
-        rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, level);
+        rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
         
         /* Resend if we were told to. */
         if (rc == -ERESTARTSYS) {
@@ -268,16 +259,13 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                 struct mdt_body *body;
                 struct lustre_capa *capa;
 
-                body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*body));
+                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body);
                 if (body->valid & OBD_MD_FLMDSCAPA) {
-                        capa = lustre_unpack_capa(req->rq_repmsg,
-                                                  REPLY_REC_OFF + 1);
-                        if (capa == NULL) {
-                                CERROR("Missing/short MDS capability\n");
+                        capa = req_capsule_server_get(&req->rq_pill,
+                                                      &RMF_CAPA1);
+                        if (capa == NULL)
                                 rc = -EPROTO;
-                        }
                 }
         }
 
@@ -291,17 +279,11 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
         CFS_LIST_HEAD(cancels);
         struct obd_device *obd = class_exp2obd(exp);
         struct ptlrpc_request *req = *request;
-        int size[5] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_rec_unlink),
-                        0, op_data->op_namelen + 1, 0 };
-        int count = 0, rc, bufcount = 4;
+        int count = 0, rc;
         ENTRY;
 
         LASSERT(req == NULL);
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                sizeof(struct lustre_capa) : 0;
-
         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && 
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
@@ -312,22 +294,32 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_FULL);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 5;
-        
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 3, &cancels, count);
-
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_REINT_UNLINK);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
-        *request = req;
+        }
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
+
+        mdc_unlink_pack(req, op_data);
 
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        size[REPLY_REC_OFF + 1] = obd->u.cli.cl_max_mds_easize;
-        size[REPLY_REC_OFF + 2] = obd->u.cli.cl_max_mds_cookiesize;
-        ptlrpc_req_set_repsize(req, 4, size);
+        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                             obd->u.cli.cl_max_mds_easize);
+        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                             obd->u.cli.cl_max_mds_cookiesize);
+        ptlrpc_request_set_replen(req);
 
-        mdc_unlink_pack(req, REQ_REC_OFF, op_data);
+        *request = req;
 
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
         if (rc == -ERESTARTSYS)
@@ -341,17 +333,9 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
         CFS_LIST_HEAD(cancels);
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
-        int size[6] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_rec_link),
-                        0, 0, op_data->op_namelen + 1, 0 };
-        int count = 0, rc, bufcount = 5;
+        int count = 0, rc;
         ENTRY;
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                sizeof(struct lustre_capa) : 0;
-        size[REQ_REC_OFF + 2] = op_data->op_capa2 ?
-                sizeof(struct lustre_capa) : 0;
-
         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
             (fid_is_sane(&op_data->op_fid2)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
@@ -362,18 +346,26 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_UPDATE);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 6;
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 4, &cancels, count);
 
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
+        }
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
-        mdc_link_pack(req, REQ_REC_OFF, op_data);
-
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        ptlrpc_req_set_repsize(req, 2, size);
+        mdc_link_pack(req, op_data);
+        ptlrpc_request_set_replen(req);
 
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
         *request = req;
@@ -390,17 +382,9 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
         CFS_LIST_HEAD(cancels);
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
-        int size[7] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_rec_rename),
-                        0, 0, oldlen + 1, newlen + 1, 0 };
-        int count = 0, rc, bufcount = 6;
+        int count = 0, rc;
         ENTRY;
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                                        sizeof(struct lustre_capa) : 0;
-        size[REQ_REC_OFF + 2] = op_data->op_capa2 ?
-                                        sizeof(struct lustre_capa) : 0;
-
         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
@@ -421,20 +405,36 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid4,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_FULL);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 7;
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 5, &cancels, count);
 
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_REINT_RENAME);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
+        }
+
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
+        req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
+
+        if (exp_connect_cancelset(exp) && req)
+                ldlm_cli_cancel_list(&cancels, count, req, 0);
 
-        mdc_rename_pack(req, REQ_REC_OFF, op_data, old, oldlen, new, newlen);
+        mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
 
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        size[REPLY_REC_OFF + 1] = obd->u.cli.cl_max_mds_easize;
-        size[REPLY_REC_OFF + 2] = obd->u.cli.cl_max_mds_cookiesize;
-        ptlrpc_req_set_repsize(req, 4, size);
+        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                             obd->u.cli.cl_max_mds_easize);
+        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                             obd->u.cli.cl_max_mds_cookiesize);
+        ptlrpc_request_set_replen(req);
 
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
         *request = req;
index b72e700..953e148 100644 (file)
@@ -55,21 +55,19 @@ extern quota_interface_t mdc_quota_interface;
 static int mdc_cleanup(struct obd_device *obd);
 
 static struct obd_capa *mdc_unpack_capa(struct ptlrpc_request *req,
-                                               unsigned int offset)
+                                        const struct req_msg_field *field)
 {
         struct lustre_capa *capa;
         struct obd_capa *oc;
 
         /* swabbed already in mdc_enqueue */
-        capa = lustre_msg_buf(req->rq_repmsg, offset, sizeof(*capa));
-        if (capa == NULL) {
-                CERROR("missing capa at offset %d failed!\n", offset);
-                return ERR_PTR(-EFAULT);
-        }
+        capa = req_capsule_server_get(&req->rq_pill, field);
+        if (capa == NULL)
+                return ERR_PTR(-EPROTO);
 
         oc = alloc_capa(CAPA_SITE_CLIENT);
         if (!oc) {
-                CERROR("alloc capa failed!\n");
+                CDEBUG(D_INFO, "alloc capa failed!\n");
                 return ERR_PTR(-ENOMEM);
         }
         oc->c_capa = *capa;
@@ -83,52 +81,46 @@ static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
                           struct obd_capa **pc, int level, int msg_flags)
 {
         struct ptlrpc_request *req;
-        struct mdt_body *body;
-        int rc, size[3] = { sizeof(struct ptlrpc_body),
-                            sizeof(*body),
-                            sizeof(struct lustre_capa) };
+        struct mdt_body       *body;
+        int                    rc;
         ENTRY;
 
-        req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS, 2, size,
-                              NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-
-        req->rq_send_state = level;
-        ptlrpc_req_set_repsize(req, 3, size);
+        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_GETSTATUS,
+                                        LUSTRE_MDS_VERSION, MDS_GETSTATUS);
+        if (req == NULL)
+                RETURN(-ENOMEM);
 
-        mdc_pack_req_body(req, REQ_REC_OFF, 0, NULL, NULL, 0, -1, 0);
+        mdc_pack_body(req, NULL, NULL, 0, 0, -1, 0);
         lustre_msg_add_flags(req->rq_reqmsg, msg_flags);
-        rc = ptlrpc_queue_wait(req);
+        req->rq_send_state = level;
 
-        if (!rc) {
-                body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
-                                          lustre_swab_mdt_body);
-                if (body == NULL) {
-                        CERROR ("Can't extract mdt_body\n");
-                        GOTO (out, rc = -EPROTO);
-                }
+        ptlrpc_request_set_replen(req);
 
-                *rootfid = body->fid1;
+        rc = ptlrpc_queue_wait(req);
+        if (rc)
+                GOTO(out, rc);
 
-                if (body->valid & OBD_MD_FLMDSCAPA) {
-                        struct obd_capa *oc;
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+        if (body == NULL)
+                GOTO(out, rc = -EPROTO);
 
-                        oc = mdc_unpack_capa(req, REPLY_REC_OFF + 1);
-                        if (IS_ERR(oc))
-                                GOTO(out, rc = PTR_ERR(oc));
-                        *pc = oc;
-                }
+        if (body->valid & OBD_MD_FLMDSCAPA) {
+                struct obd_capa *oc;
 
-                CDEBUG(D_NET, "root fid="DFID", last_committed="LPU64
-                       ", last_xid="LPU64"\n",
-                       PFID(rootfid),
-                       lustre_msg_get_last_committed(req->rq_repmsg),
-                       lustre_msg_get_last_xid(req->rq_repmsg));
+                oc = mdc_unpack_capa(req, &RMF_CAPA1);
+                if (IS_ERR(oc))
+                        GOTO(out, rc = PTR_ERR(oc));
+                *pc = oc;
         }
 
+        *rootfid = body->fid1;
+        CDEBUG(D_NET,
+               "root fid="DFID", last_committed="LPU64", last_xid="LPU64"\n",
+               PFID(rootfid),
+               lustre_msg_get_last_committed(req->rq_repmsg),
+               lustre_msg_get_last_xid(req->rq_repmsg));
         EXIT;
- out:
+out:
         ptlrpc_req_finished(req);
         return rc;
 }
@@ -151,52 +143,32 @@ int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
  * of fields. This issue will be fixed later when client gets awar of RPC
  * layouts.  --umka
  */
-static int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
-                              unsigned int acl_size, int mdscapa,
+static int mdc_getattr_common(struct obd_export *exp,
                               struct ptlrpc_request *req)
 {
-        struct mdt_body *body;
-        void *eadata;
-        int size[5] = { sizeof(struct ptlrpc_body),
-                        sizeof(*body),
-                        ea_size,
-                        acl_size,
-                        sizeof(struct lustre_capa) };
-        int offset, rc;
+        struct req_capsule *pill = &req->rq_pill;
+        struct mdt_body    *body;
+        void               *eadata;
+        int                 rc;
         ENTRY;
 
         /* Request message already built. */
-        if (ea_size)
-                CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
-                       ea_size);
-        if (acl_size)
-                CDEBUG(D_INODE, "reserved %u bytes for ACL\n", acl_size);
-
-        ptlrpc_req_set_repsize(req, 5, size);
-
         rc = ptlrpc_queue_wait(req);
         if (rc != 0)
-                RETURN (rc);
+                RETURN(rc);
 
-        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
-                                  lustre_swab_mdt_body);
-        if (body == NULL) {
-                CERROR ("Can't unpack mdt_body\n");
-                RETURN (-EPROTO);
-        }
+        /* sanity check for the reply */
+        body = req_capsule_server_get(pill, &RMF_MDT_BODY);
+        if (body == NULL)
+                RETURN(-EPROTO);
 
         CDEBUG(D_NET, "mode: %o\n", body->mode);
 
-        offset = REPLY_REC_OFF + 1;
-        lustre_set_rep_swabbed(req, offset);
         if (body->eadatasize != 0) {
-                /* reply indicates presence of eadata; check it's there... */
-                eadata = lustre_msg_buf(req->rq_repmsg, offset++,
-                                        body->eadatasize);
-                if (eadata == NULL) {
-                        CERROR ("Missing/short eadata\n");
-                        RETURN (-EPROTO);
-                }
+                eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
+                                                      body->eadatasize);
+                if (eadata == NULL)
+                        RETURN(-EPROTO);
         }
 
         if (body->valid & OBD_MD_FLMODEASIZE) {
@@ -208,20 +180,21 @@ static int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
                         cli->cl_max_mds_cookiesize = body->max_cookiesize;
         }
 
-        offset += !!body->aclsize;
+        if (body->valid & OBD_MD_FLRMTPERM) {
+                struct mdt_remote_perm *perm;
+                perm = req_capsule_server_get(pill, &RMF_ACL);
+                if (perm == NULL)
+                        RETURN(-EPROTO);
+        }
 
         if (body->valid & OBD_MD_FLMDSCAPA) {
                 struct lustre_capa *capa;
-
-                LASSERT(mdscapa);
-                capa = lustre_unpack_capa(req->rq_repmsg, offset++);
-                if (capa == NULL) {
-                        CERROR("Missing/short client MDS capability\n");
+                capa = req_capsule_server_get(pill, &RMF_CAPA1);
+                if (capa == NULL)
                         RETURN(-EPROTO);
-                }
         }
 
-        RETURN (0);
+        RETURN(0);
 }
 
 int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid,
@@ -229,40 +202,37 @@ int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid,
                 struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
-        int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
-        int acl_size = 0, rc;
+        int                    rc;
         ENTRY;
 
-        size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0;
+        *request = NULL;
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
+        if (req == NULL)
+                RETURN(-ENOMEM);
 
-        /*
-         * XXX: Do we need to make another request here?  We just did a getattr
-         * to do the lookup in the first place.
-         */
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_GETATTR, 3, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        mdc_set_capa_size(req, &RMF_CAPA1, oc);
+
+        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
+        if (rc) {
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
-        mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, ea_size, -1,
-                          MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/);
+        /* MDS_BFLAG_EXT_FLAGS: request "new" flags(bug 9486) */
+        mdc_pack_body(req, fid, oc, valid, ea_size, -1, MDS_BFLAG_EXT_FLAGS);
 
+        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, ea_size);
         if (valid & OBD_MD_FLRMTPERM)
-                acl_size = sizeof(struct mdt_remote_perm);
-        
-        /* Currently only root inode will call us with FLACL */
-        else if (valid & OBD_MD_FLACL)
-                acl_size = LUSTRE_POSIX_ACL_MAX_SIZE;
+                req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
+                                     sizeof(struct mdt_remote_perm));
+        ptlrpc_request_set_replen(req);
 
-        rc = mdc_getattr_common(exp, ea_size, acl_size,
-                                !!(valid & OBD_MD_FLMDSCAPA), req);
-        if (rc != 0) {
-                ptlrpc_req_finished (req);
-                req = NULL;
-        }
- out:
-        *request = req;
-        RETURN (rc);
+        rc = mdc_getattr_common(exp, req);
+        if (rc)
+                ptlrpc_req_finished(req);
+        else
+                *request = req;
+        RETURN(rc);
 }
 
 int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
@@ -271,291 +241,292 @@ int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
                      struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
-        struct mdt_body *body;
-        int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body), 0, namelen};
-        int rc;
+        int                    rc;
         ENTRY;
 
-        size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0;
+        *request = NULL;
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_GETATTR_NAME);
+        if (req == NULL)
+                RETURN(-ENOMEM);
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_GETATTR_NAME, 4, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        mdc_set_capa_size(req, &RMF_CAPA1, oc);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, namelen);
+
+        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME);
+        if (rc) {
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
-        mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, ea_size, suppgid,
-                          MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/);
+        /* MDS_BFLAG_EXT_FLAGS: request "new" flags(bug 9486) */
+        mdc_pack_body(req, fid, oc, valid, ea_size, suppgid,
+                      MDS_BFLAG_EXT_FLAGS);
 
         if (filename) {
+                char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 LASSERT(strnlen(filename, namelen) == namelen - 1);
-                memcpy(lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, namelen),
-                       filename, namelen);
+                memcpy(name, filename, namelen);
         }
 
-        rc = mdc_getattr_common(exp, ea_size, 0, !!(valid & OBD_MD_FLMDSCAPA),
-                                req);
-        if (rc != 0) {
-                ptlrpc_req_finished (req);
-                req = NULL;
-        }
- out:
-        *request = req;
+        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, ea_size);
+        ptlrpc_request_set_replen(req);
+
+        rc = mdc_getattr_common(exp, req);
+        if (rc)
+                ptlrpc_req_finished(req);
+        else
+                *request = req;
         RETURN(rc);
 }
 
-static int mdc_is_subdir(struct obd_export *exp, const struct lu_fid *pfid,
-                         const struct lu_fid *cfid, struct ptlrpc_request **request)
+static int mdc_is_subdir(struct obd_export *exp,
+                         const struct lu_fid *pfid,
+                         const struct lu_fid *cfid,
+                         struct ptlrpc_request **request)
 {
-        int size[2] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_body) };
-        struct ptlrpc_request *req;
-        struct mdt_body *body;
-        int rc;
+        struct ptlrpc_request  *req;
+        int                     rc;
+
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_IS_SUBDIR, 2, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        *request = NULL;
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                        &RQF_MDS_IS_SUBDIR, LUSTRE_MDS_VERSION,
+                                        MDS_IS_SUBDIR);
+        if (req == NULL)
+                RETURN(-ENOMEM);
 
-        mdc_is_subdir_pack(req, REQ_REC_OFF, pfid, cfid, 0);
+        mdc_is_subdir_pack(req, pfid, cfid, 0);
+        ptlrpc_request_set_replen(req);
 
-        ptlrpc_req_set_repsize(req, 2, size);
         rc = ptlrpc_queue_wait(req);
-        if (rc != 0 && rc != -EREMOTE)
-                GOTO(out, rc);
-
-        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
-                                  lustre_swab_mdt_body);
-        if (body == NULL) {
-                CERROR ("Can't unpack mdt_body\n");
-                GOTO(out, rc = -EPROTO);
-        }
-        EXIT;
- out:
-        *request = req;
-        return rc;
+        if (rc && rc != -EREMOTE)
+                ptlrpc_req_finished(req);
+        else
+                *request = req;
+        RETURN(rc);
 }
 
-static
-int mdc_xattr_common(struct obd_export *exp, const struct lu_fid *fid,
-                     struct obd_capa *oc,
-                     int opcode, obd_valid valid, const char *xattr_name,
-                     const char *input, int input_size, int output_size,
-                     int flags, __u32 suppgid, struct ptlrpc_request **request)
+static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
+                            const struct lu_fid *fid,
+                            struct obd_capa *oc, int opcode, obd_valid valid,
+                            const char *xattr_name, const char *input,
+                            int input_size, int output_size, int flags,
+                            __u32 suppgid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
-        int size[5] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
-        int bufcnt = 3, offset = REQ_REC_OFF + 2;
-        int rc, xattr_namelen = 0;
-        void *tmp;
+        int   xattr_namelen = 0;
+        char *tmp;
+        int   rc;
         ENTRY;
 
-        size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0;
+        *request = NULL;
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        mdc_set_capa_size(req, &RMF_CAPA1, oc);
         if (xattr_name) {
                 xattr_namelen = strlen(xattr_name) + 1;
-                size[bufcnt++] = xattr_namelen;
+                req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                                     xattr_namelen);
         }
         if (input_size) {
                 LASSERT(input);
-                size[bufcnt++] = input_size;
+                req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
+                                     input_size);
         }
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              opcode, bufcnt, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
+        if (rc) {
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
-        /* request data */
-        mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, output_size,
-                          suppgid, flags);
+        if (opcode == MDS_REINT) {
+                struct mdt_rec_setxattr *rec;
 
+                CLASSERT(sizeof(struct mdt_rec_setxattr) ==
+                         sizeof(struct mdt_rec_reint));
+                rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
+                rec->sx_opcode = REINT_SETXATTR;
+                /* TODO: 
+                 *  cfs_curproc_fs{u,g}id() should replace 
+                 *  current->fs{u,g}id for portability.
+                 */
+                rec->sx_fsuid  = current->fsuid;
+                rec->sx_fsgid  = current->fsgid;
+                rec->sx_cap    = current->cap_effective;
+                rec->sx_suppgid1 = suppgid;
+                rec->sx_suppgid1 = -1;
+                rec->sx_fid    = *fid;
+                rec->sx_valid  = valid;
+                rec->sx_size   = output_size;
+                rec->sx_flags  = flags;
+
+                mdc_pack_capa(req, &RMF_CAPA1, oc);
+        } else {
+                mdc_pack_body(req, fid, oc, valid, output_size, suppgid, flags);
+        }
 
         if (xattr_name) {
-                tmp = lustre_msg_buf(req->rq_reqmsg, offset++, xattr_namelen);
+                tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 memcpy(tmp, xattr_name, xattr_namelen);
         }
         if (input_size) {
-                tmp = lustre_msg_buf(req->rq_reqmsg, offset++, input_size);
+                tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
                 memcpy(tmp, input, input_size);
         }
 
-        /* reply buffers */
-        if (opcode == MDS_GETXATTR) {
-                size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-                bufcnt = 2;
-        } else {
-                bufcnt = 1;
-        }
-
-        /* we do this even output_size is 0, because server is doing that */
-        size[bufcnt++] = output_size;
-        ptlrpc_req_set_repsize(req, bufcnt, size);
+        if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER))
+                req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
+                                     RCL_SERVER, output_size);
+        ptlrpc_request_set_replen(req);
 
         /* make rpc */
-        if (opcode == MDS_SETXATTR)
+        if (opcode == MDS_REINT)
                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 
         rc = ptlrpc_queue_wait(req);
 
-        if (opcode == MDS_SETXATTR)
+        if (opcode == MDS_REINT)
                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 
-        if (rc != 0)
-                GOTO(err_out, rc);
-
-        if (opcode == MDS_GETXATTR) {
-                struct mdt_body * body = lustre_swab_repbuf(req, REPLY_REC_OFF,
-                                          sizeof(*body), lustre_swab_mdt_body);
-                if (body == NULL) {
-                        CERROR ("Can't unpack mdt_body\n");
-                        GOTO(err_out, rc = -EPROTO);
-                }
-        }
-out:
-        *request = req;
-        RETURN (rc);
-err_out:
-        ptlrpc_req_finished(req);
-        req = NULL;
-        goto out;
+        if (rc)
+                ptlrpc_req_finished(req);
+        else
+                *request = req;
+        RETURN(rc);
 }
 
 int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
                  struct obd_capa *oc, obd_valid valid, const char *xattr_name,
-                 const char *input, int input_size, int output_size, int flags,
-                 __u32 suppgid, struct ptlrpc_request **request)
+                 const char *input, int input_size, int output_size,
+                 int flags, __u32 suppgid, struct ptlrpc_request **request)
 {
-        return mdc_xattr_common(exp, fid, oc, MDS_SETXATTR, valid, xattr_name,
-                                input, input_size, output_size, flags