Whamcloud - gitweb
mdt: new portion of cleanup. Mostly switch to req-layout.
authornikita <nikita>
Wed, 24 May 2006 22:21:27 +0000 (22:21 +0000)
committernikita <nikita>
Wed, 24 May 2006 22:21:27 +0000 (22:21 +0000)
Tested by: mount && cd /mnt/lustre && mkdir foo && ls -ldi foo

lustre/include/lustre/lustre_idl.h
lustre/include/lustre_dlm.h
lustre/include/lustre_req_layout.h
lustre/ldlm/ldlm_lockd.c
lustre/lmv/lmv_fld.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_reint.c
lustre/ptlrpc/layout.c

index 7f591fe..e1266d6 100644 (file)
@@ -177,9 +177,16 @@ static inline __u64 fid_num(const struct lu_fid *fid)
         return f_ver | fid_oid(fid);
 }
 
+static inline int fid_seq_is_sane(__u64 seq)
+{
+        return seq != 0;
+}
+
 static inline int fid_is_sane(const struct lu_fid *fid)
 {
-        return fid != NULL && fid_seq(fid) != 0 && fid_oid(fid) != 0;
+        return
+                fid != NULL &&
+                fid_seq_is_sane(fid_seq(fid)) && fid_oid(fid) != 0;
 }
 
 #define DFID3 "["LPU64"/%u:%u]"
index 4b74c90..f68fca4 100644 (file)
@@ -570,13 +570,13 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                      void *lvb_swabber,
                      struct lustre_handle *lockh);
 int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
-                         struct ldlm_request *dlm_req,
-                         struct ldlm_callback_suite *cbs);
+                         const struct ldlm_request *dlm_req,
+                         const struct ldlm_callback_suite *cbs);
 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
                     void *data, __u32 data_len);
 int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags);
 int ldlm_handle_convert0(struct ptlrpc_request *req,
-                         struct ldlm_request *dlm_req);
+                         const struct ldlm_request *dlm_req);
 int ldlm_cli_cancel(struct lustre_handle *lockh);
 int ldlm_cli_cancel_unused(struct ldlm_namespace *, struct ldlm_res_id *,
                            int flags, void *opaque);
index 89660f4..8774a06 100644 (file)
@@ -46,27 +46,30 @@ enum req_location {
 
 struct req_capsule {
         struct ptlrpc_request   *rc_req;
-        const struct req_format *rc_fmt[RCL_NR];
+        const struct req_format *rc_fmt;
         __u32                    rc_swabbed;
         enum req_location        rc_loc;
+        int                     *rc_area;
 };
 
-void req_capsule_init(struct req_capsule *pill,
-                      struct ptlrpc_request *req,
-                      enum req_location location);
+void req_capsule_init(struct req_capsule *pill, struct ptlrpc_request *req,
+                      enum req_location location, int *area);
 void req_capsule_fini(struct req_capsule *pill);
 
-void req_capsule_client_init(struct req_capsule *pill,
-                             const struct req_format *fmt);
-void req_capsule_server_init(struct req_capsule *pill,
-                             const struct req_format *fmt);
-int req_capsule_start(struct req_capsule *pill,
-                      const struct req_format *fmt, int *area);
+void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt);
+int  req_capsule_pack(struct req_capsule *pill);
 
-const void *req_capsule_client_get(const struct req_capsule *pill,
-                                   const struct req_msg_field *field);
+void *req_capsule_client_get(const struct req_capsule *pill,
+                             const struct req_msg_field *field);
 void *req_capsule_server_get(const struct req_capsule *pill,
                              const struct req_msg_field *field);
+const void *req_capsule_other_get(const struct req_capsule *pill,
+                                  const struct req_msg_field *field);
+
+void req_capsule_set_size(const struct req_capsule *pill,
+                          const struct req_msg_field *field,
+                          enum req_location loc, int size);
+void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt);
 
 int  req_layout_init(void);
 void req_layout_fini(void);
@@ -74,13 +77,32 @@ void req_layout_fini(void);
 extern const struct req_format RQF_MDS_GETSTATUS;
 extern const struct req_format RQF_MDS_STATFS;
 extern const struct req_format RQF_MDS_GETATTR;
+extern const struct req_format RQF_MDS_CONNECT;
+extern const struct req_format RQF_MDS_DISCONNECT;
+
+/*
+ * This is format of direct (non-intent) MDS_GETATTR_NAME request.
+ */
 extern const struct req_format RQF_MDS_GETATTR_NAME;
 extern const struct req_format RQF_MDS_REINT_CREATE;
+extern const struct req_format RQF_LDLM_ENQUEUE;
+extern const struct req_format RQF_LDLM_INTENT;
+extern const struct req_format RQF_LDLM_INTENT_GETATTR;
 
 extern const struct req_msg_field RMF_MDT_BODY;
 extern const struct req_msg_field RMF_OBD_STATFS;
 extern const struct req_msg_field RMF_NAME;
 extern const struct req_msg_field RMF_REC_CREATE;
-
+extern const struct req_msg_field RMF_TGTUUID;
+extern const struct req_msg_field RMF_CLUUID;
+/*
+ * connection handle received in MDS_CONNECT request.
+ */
+extern const struct req_msg_field RMF_CONN;
+extern const struct req_msg_field RMF_CONNECT_DATA;
+extern const struct req_msg_field RMF_DLM_REQ;
+extern const struct req_msg_field RMF_DLM_REP;
+extern const struct req_msg_field RMF_LDLM_INTENT;
+extern const struct req_msg_field RMF_MDT_MD;
 
 #endif /* _LUSTRE_REQ_LAYOUT_H__ */
index 0ec457e..274fe33 100644 (file)
@@ -659,8 +659,8 @@ find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl)
  */
 int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                          struct ptlrpc_request *req,
-                         struct ldlm_request *dlm_req,
-                         struct ldlm_callback_suite *cbs)
+                         const struct ldlm_request *dlm_req,
+                         const struct ldlm_callback_suite *cbs)
 {
         struct ldlm_reply *dlm_rep;
         int rc = 0, size[2] = {sizeof(*dlm_rep)};
@@ -704,7 +704,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
         }
 
 #if 0
-        /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check 
+        /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
            against server's _CONNECT_SUPPORTED flags? (I don't want to use
            ibits for mgc/mgs) */
 
@@ -921,7 +921,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 }
 
 int ldlm_handle_convert0(struct ptlrpc_request *req,
-                         struct ldlm_request *dlm_req)
+                         const struct ldlm_request *dlm_req)
 {
         struct ldlm_reply *dlm_rep;
         struct ldlm_lock *lock;
@@ -1182,7 +1182,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
         l_unlock(&ns->ns_lock);
         if (lock->l_granted_mode == LCK_PW &&
             !lock->l_readers && !lock->l_writers &&
-            cfs_time_after(cfs_time_current(), 
+            cfs_time_after(cfs_time_current(),
                            cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) {
                 if (ldlm_bl_to_thread(ns, NULL, lock))
                         ldlm_handle_bl_callback(ns, NULL, lock);
@@ -1610,7 +1610,7 @@ static int ldlm_setup(void)
         spin_lock_init(&waiting_locks_spinlock);
         cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
 
-        /* Using CLONE_FILES instead of CLONE_FS here causes failures in 
+        /* Using CLONE_FILES instead of CLONE_FS here causes failures in
            conf-sanity test 21.  But using CLONE_FS can cause problems
            if the daemonize happens between push/pop_ctxt... */
         rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
index 106dd44..99a5cab 100644 (file)
@@ -49,6 +49,9 @@ int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid)
 {
         int rc;
         ENTRY;
+
+        LASSERT(fid_is_sane(fid));
+
         /* temporary hack until fld will works */
         rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
         CWARN("LMV: got MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
index 90a8540..093c350 100644 (file)
@@ -73,23 +73,18 @@ static int mdt_getstatus(struct mdt_thread_info *info,
 {
         struct md_device *next  = info->mti_mdt->mdt_child;
         int               result;
+        struct mdt_body  *body;
 
         ENTRY;
 
-        result = req_capsule_start(&info->mti_pill, &RQF_MDS_GETSTATUS,
-                                   info->mti_rep_buf_size);
-        if (result)
-                ;
-        else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
                 result = -ENOMEM;
         else {
-                info->mti_body = req_capsule_server_get(&info->mti_pill,
-                                                        &RMF_MDT_BODY);
+                body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
                 result = next->md_ops->mdo_root_get(info->mti_ctxt,
-                                                    next,
-                                                    &info->mti_body->fid1);
+                                                    next, &body->fid1);
                 if (result == 0)
-                        info->mti_body->valid |= OBD_MD_FLID;
+                        body->valid |= OBD_MD_FLID;
         }
 
         /* the last_committed and last_xid fields are filled in for all
@@ -103,29 +98,19 @@ static int mdt_statfs(struct mdt_thread_info *info,
 {
         struct md_device  *next  = info->mti_mdt->mdt_child;
         struct obd_statfs *osfs;
-        struct kstatfs    *sfs;
-        int               result;
+        int                result;
 
         ENTRY;
 
-        info->mti_rep_buf_size[0] = sizeof(struct obd_statfs);
-        result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
-        if (result)
-                CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
-                       sizeof(struct obd_statfs));
-        else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
                 result = -ENOMEM;
         } else {
-                osfs = lustre_msg_buf(req->rq_repmsg, 0,
-                                      sizeof(struct obd_statfs));
-                OBD_ALLOC_PTR(sfs);
-                if(sfs == NULL)
-                        RETURN(-ENOMEM);
+                osfs = req_capsule_server_get(&info->mti_pill, &RMF_OBD_STATFS);
                 /* XXX max_age optimisation is needed here. See mds_statfs */
-                result = next->md_ops->mdo_statfs(info->mti_ctxt, next, sfs);
-                statfs_pack(osfs, sfs);
-                OBD_FREE_PTR(sfs);
+                result = next->md_ops->mdo_statfs(info->mti_ctxt,
+                                                  next, &info->mti_sfs);
+                statfs_pack(osfs, &info->mti_sfs);
         }
 
         RETURN(result);
@@ -157,19 +142,14 @@ static int mdt_getattr(struct mdt_thread_info *info,
                        struct ptlrpc_request *req, int offset)
 {
         int              result;
+        struct mdt_body *body;
 
         LASSERT(info->mti_object != NULL);
         LASSERT(lu_object_exists(info->mti_ctxt,
                                  &info->mti_object->mot_obj.mo_lu));
-
         ENTRY;
 
-        info->mti_rep_buf_size[0] = sizeof(struct mdt_body);
-        result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
-        if (result)
-                CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
-                       sizeof(struct mdt_body), result);
-        else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
                 result = -ENOMEM;
         } else {
@@ -177,11 +157,11 @@ static int mdt_getattr(struct mdt_thread_info *info,
 
                 result = mo_attr_get(info->mti_ctxt, next, &info->mti_attr);
                 if (result == 0) {
-                        info->mti_body = lustre_msg_buf(req->rq_repmsg, 0,
-                                                        sizeof(struct mdt_body));
-                        mdt_pack_attr2body(info->mti_body, &info->mti_attr);
-                        info->mti_body->fid1 = *mdt_object_fid(info->mti_object);
-                        info->mti_body->valid |= OBD_MD_FLID;
+                        body = req_capsule_server_get(&info->mti_pill,
+                                                      &RMF_MDT_BODY);
+                        mdt_pack_attr2body(body, &info->mti_attr);
+                        body->fid1 = *mdt_object_fid(info->mti_object);
+                        body->valid |= OBD_MD_FLID;
                 }
         }
         RETURN(result);
@@ -193,23 +173,17 @@ static int mdt_getattr_name(struct mdt_thread_info *info,
         struct md_object  *next = mdt_object_child(info->mti_object);
         struct mdt_object *child;
         struct mdt_body   *body;
-        char *name;
-        int namesize;
+        const char *name;
         int result;
 
         LASSERT(info->mti_object != NULL);
 
         ENTRY;
 
-        body = lustre_msg_buf(req->rq_repmsg, 1, sizeof *body);
-        LASSERT(body != NULL);
-
-        name = lustre_msg_string(req->rq_reqmsg, offset, 0);
-        if (name == NULL) {
-                CERROR("Can't unpack name\n");
+        body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
+        if (name == NULL)
                 RETURN(-EFAULT);
-        }
-        namesize = lustre_msg_buflen(req->rq_reqmsg, offset);
 
         result = mdo_lookup(info->mti_ctxt, next, name, &body->fid1);
         if (result == 0) {
@@ -303,11 +277,11 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                 CERROR("invalid record\n");
                 RETURN(rc = -EINVAL);
         }
-    
+
         /* XXX: this should be cleanup up */
         rep_off = offset == MDS_REQ_INTENT_REC_OFF ? 1 : 0;
-        
-        /* init body for handlers by rep's body, they may want to 
+
+        /* init body for handlers by rep's body, they may want to
          * store there something. */
         info->mti_body = lustre_msg_buf(req->rq_repmsg, rep_off,
                                         sizeof(struct mdt_body));
@@ -351,7 +325,7 @@ static int mdt_reint(struct mdt_thread_info *info,
                                info->mti_rep_buf_size, NULL);
         if (rc)
                 RETURN(rc);
-    
+
         /* init body by rep body, needed by handlers to return data to client */
         info->mti_body = lustre_msg_buf(req->rq_repmsg, 0,
                                         sizeof(struct mdt_body));
@@ -512,7 +486,7 @@ static struct mdt_object *mdt_obj(struct lu_object *o)
 
 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
                                    struct mdt_device *d,
-                                   struct lu_fid *f)
+                                   const struct lu_fid *f)
 {
         struct lu_object *o;
 
@@ -584,23 +558,29 @@ struct mdt_handler {
         __u32       mh_flags;
         int (*mh_act)(struct mdt_thread_info *info,
                       struct ptlrpc_request *req, int offset);
+
+        const struct req_format *mh_fmt;
 };
 
 enum mdt_handler_flags {
         /*
-         * struct mdt_body is passed in the 0-th incoming buffer.
+         * struct mdt_body is passed in the incoming message.
          */
         HABEO_CORPUS = (1 << 0),
         /*
-         * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
-         * incoming buffer.
+         * struct ldlm_request is passed in the incoming message.
          */
         HABEO_CLAVIS = (1 << 1),
         /*
-         * object, identified by fid passed in mdt_body already exists (on
-         * disk)..
+         * object, identified by fid passed in mdt_body, already exists (on
+         * disk).
          */
-        HABEO_DISCUS = HABEO_CORPUS | (1 << 2)
+        HABEO_DISCUS = HABEO_CORPUS | (1 << 2),
+        /*
+         * this request has fixed reply format, so that reply message can be
+         * packed by generic code.
+         */
+        HABEO_REFERO = (1 << 3)
 };
 
 struct mdt_opc_slice {
@@ -649,6 +629,58 @@ static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
 }
 
 /*
+ * Generic code handling requests that have struct mdt_body passed in:
+ *
+ *  - extract mdt_body from request and save it in @info;
+ *
+ *  - create lu_object, corresponding to the fid in mdt_body, and save it in
+ *  @info;
+ *
+ *  - if HABEO_DISCUS flag is set for this request type in addition to
+ *  HABEO_CORPUS, check whether object actually exists on storage
+ *  (lu_object_exists()).
+ *
+ */
+static int mdt_habeo_corpus(struct mdt_thread_info *info, __u32 flags)
+{
+        const struct mdt_body   *body;
+        struct mdt_object       *obj;
+        const struct lu_context *ctx;
+
+        int result;
+
+        ctx = info->mti_ctxt;
+
+        body = info->mti_body = req_capsule_client_get(&info->mti_pill,
+                                                       &RMF_MDT_BODY);
+        if (body != NULL) {
+                if (fid_is_sane(&body->fid1)) {
+                        obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
+
+                        if (!IS_ERR(obj)) {
+                                if ((flags & HABEO_DISCUS) == HABEO_DISCUS &&
+                                    !lu_object_exists(ctx,
+                                                      &obj->mot_obj.mo_lu)) {
+                                        mdt_object_put(ctx, obj);
+                                        result = -ENOENT;
+                                } else {
+                                        info->mti_object = obj;
+                                        result = 0;
+                                }
+                        } else
+                                result = PTR_ERR(obj);
+                } else {
+                        CERROR("Invalid fid: "DFID3"\n", PFID3(&body->fid1));
+                        result = -EINVAL;
+                }
+        } else {
+                CERROR("Can't unpack body\n");
+                result = -EFAULT;
+        }
+        return result;
+}
+
+/*
  * Invoke handler for this request opc. Also do necessary preprocessing
  * (according to handler ->mh_flags), and post-processing (setting of
  * ->last_{xid,committed}).
@@ -677,49 +709,38 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
         result = 0;
         flags = h->mh_flags;
-        req_capsule_init(&info->mti_pill, req, RCL_SERVER);
-        if (flags & HABEO_CORPUS) {
-                struct mdt_body   *body;
-                const struct lu_context *ctx;
-                struct mdt_object *obj;
-
-                ctx = info->mti_ctxt;
-                body = info->mti_body =
-                        lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
-                                           lustre_swab_mdt_body);
-                if (body != NULL) {
-                        obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
-                        if (!IS_ERR(obj)) {
-                                if ((flags & HABEO_DISCUS) == HABEO_DISCUS &&
-                                    !lu_object_exists(ctx,
-                                                      &obj->mot_obj.mo_lu)) {
-                                        mdt_object_put(ctx, obj);
-                                        result = -ENOENT;
-                                } else
-                                        info->mti_object = obj;
-                        } else
-                                result = PTR_ERR(obj);
-                } else {
-                        CERROR("Can't unpack body\n");
-                        result = -EFAULT;
-                }
-        } else if (flags & HABEO_CLAVIS) {
+        LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL));
+
+        req_capsule_init(&info->mti_pill,
+                         req, RCL_SERVER, info->mti_rep_buf_size);
+
+        if (h->mh_fmt != NULL) {
+                req_capsule_set(&info->mti_pill, h->mh_fmt);
+                if (flags & HABEO_REFERO)
+                        result = req_capsule_pack(&info->mti_pill);
+        }
+
+        if (result == 0 && flags & HABEO_CORPUS)
+                result = mdt_habeo_corpus(info, flags);
+
+        if (result == 0 && flags & HABEO_CLAVIS) {
                 struct ldlm_request *dlm;
 
                 LASSERT(shift == 0);
-                dlm = info->mti_dlm_req =
-                        lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
-                                           sizeof *dlm,
-                                           lustre_swab_ldlm_request);
+                LASSERT(h->mh_fmt != NULL);
+
+                dlm = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
                 if (dlm != NULL) {
                         if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
                                 result = mdt_lock_resname_compat(info->mti_mdt,
                                                                  dlm);
+                        info->mti_dlm_req = dlm;
                 } else {
                         CERROR("Can't unpack dlm request\n");
                         result = -EFAULT;
                 }
         }
+
         if (result == 0)
                 /*
                  * Process request.
@@ -736,7 +757,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
         LASSERT(current->journal_info == NULL);
 
-        if (flags & HABEO_CLAVIS &&
+        if (result == 0 && flags & HABEO_CLAVIS &&
             info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
                 struct ldlm_reply *rep;
 
@@ -770,11 +791,8 @@ static void mdt_thread_info_init(struct mdt_thread_info *info)
         int i;
 
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
-        /*
-         * Poison size array.
-         */
         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
-                info->mti_rep_buf_size[i] = ~0;
+                info->mti_rep_buf_size[i] = 0;
         info->mti_rep_buf_nr = i;
         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
                 mdt_lock_handle_init(&info->mti_lh[i]);
@@ -1051,6 +1069,77 @@ void intent_set_disposition(struct ldlm_reply *rep, int flag)
         rep->lock_policy_res1 |= flag;
 }
 
+enum mdt_it_code {
+        MDT_IT_OPEN,
+        MDT_IT_OCREAT,
+        MDT_IT_CREATE,
+        MDT_IT_GETATTR,
+        MDT_IT_READDIR,
+        MDT_IT_LOOKUP,
+        MDT_IT_UNLINK,
+        MDT_IT_TRUNC,
+        MDT_IT_GETXATTR,
+        MDT_IT_NR
+};
+
+static struct {
+        const struct req_format *it_fmt;
+        __u32                    it_flags;
+        int                    (*it_act)(struct mdt_thread_info *,
+                                         struct ptlrpc_request *, int);
+} mdt_it_flavor[] = {
+        [MDT_IT_OPEN]     = { NULL,                     0, mdt_reint_internal },
+        [MDT_IT_OCREAT]   = { NULL,                     0, mdt_reint_internal },
+        [MDT_IT_CREATE]   = { NULL,                     0, NULL },
+        [MDT_IT_GETATTR]  = { &RQF_LDLM_INTENT_GETATTR,
+                              HABEO_DISCUS, mdt_getattr_name },
+        [MDT_IT_READDIR]  = { NULL,                     0, NULL },
+        [MDT_IT_LOOKUP]   = { &RQF_LDLM_INTENT_GETATTR,
+                              HABEO_DISCUS, mdt_getattr_name },
+        [MDT_IT_UNLINK]   = { NULL,                     0, NULL },
+        [MDT_IT_TRUNC]    = { NULL,                     0, NULL },
+        [MDT_IT_GETXATTR] = { NULL,                     0, NULL }
+};
+
+static int mdt_intent_code(long itcode)
+{
+        int result;
+
+        switch(itcode) {
+        case IT_OPEN:
+                result = MDT_IT_OPEN;
+                break;
+        case IT_OPEN|IT_CREAT:
+                result = MDT_IT_OCREAT;
+                break;
+        case IT_CREAT:
+                result = MDT_IT_CREATE;
+                break;
+        case IT_READDIR:
+                result = MDT_IT_READDIR;
+                break;
+        case IT_GETATTR:
+                result = MDT_IT_GETATTR;
+                break;
+        case IT_LOOKUP:
+                result = MDT_IT_LOOKUP;
+                break;
+        case IT_UNLINK:
+                result = MDT_IT_UNLINK;
+                break;
+        case IT_TRUNC:
+                result = MDT_IT_TRUNC;
+                break;
+        case IT_GETXATTR:
+                result = MDT_IT_GETXATTR;
+                break;
+        default:
+                CERROR("Unknown intent opcode: %ld\n", itcode);
+                result = -EINVAL;
+                break;
+        }
+        return result;
+}
 
 static int mdt_intent_policy(struct ldlm_namespace *ns,
                              struct ldlm_lock **lockp, void *req_cookie,
@@ -1064,99 +1153,53 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
         int rc;
         int gflags = 0;
         struct mdt_thread_info *info;
-        struct mdt_body   *body;
+        struct req_capsule *pill;
+        int opcode;
         ENTRY;
 
         LASSERT(req != NULL);
 
         info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
         LASSERT(info != NULL);
-
+        pill = &info->mti_pill;
+        LASSERT(pill->rc_req == req);
 
         if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
                 /* No intent was provided */
-                info->mti_rep_buf_size[0] = sizeof(struct ldlm_reply);
-                rc = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
-                LASSERT(rc == 0);
-                RETURN(0);
+                LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
+                RETURN(req_capsule_pack(pill));
         }
 
-        it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
-                                lustre_swab_ldlm_intent);
+        req_capsule_extend(pill, &RQF_LDLM_INTENT);
+        it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
         if (it == NULL) {
-                CERROR("Intent missing\n");
-                RETURN(req->rq_status = -EFAULT);
+                rc = -EFAULT;
+                RETURN(rc);
         }
 
-        if (it->opc == IT_GETATTR || it->opc == IT_LOOKUP) {
-                body = lustre_swab_reqbuf(req, MDS_REQ_INTENT_REC_OFF,
-                                          sizeof *body,
-                                          lustre_swab_mdt_body);
-                info->mti_body = body;
-                if (body != NULL) {
-                        struct mdt_object *obj;
-                        const struct lu_context *ctx = info->mti_ctxt;
-
-                        obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
-                        if (!IS_ERR(obj)) {
-                                if (!lu_object_exists(ctx, 
-                                                      &obj->mot_obj.mo_lu)) {
-                                        CERROR("Object doesn't exist\n");
-                                        mdt_object_put(ctx, obj);
-                                        rc = -ENOENT;
-                                } else {
-                                        info->mti_object = obj;
-                                        rc = 0;
-                                }
-                        } else
-                                rc = PTR_ERR(obj);
-                } else {
-                        CERROR("Can't unpack body\n");
-                        rc = -EFAULT;
-                }
-
-                //TODO: if rc then pack reply according to it
-                if (rc) {
-                        CERROR("Cannot prepare intent info, rc=%u\n", rc);
-                        RETURN(req->rq_status = rc);
-                }
-        }
         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
-        info->mti_rep_buf_nr = 3;
-        info->mti_rep_buf_size[0] = sizeof(*rep);
-        info->mti_rep_buf_size[1] = sizeof(struct mdt_body);
-        info->mti_rep_buf_size[2] = sizeof(struct lov_mds_md);/*FIXME:See mds*/
 
+        opcode = mdt_intent_code(it->opc);
+        if (opcode < 0)
+                RETURN(-EINVAL);
 
-        rc = lustre_pack_reply(req, info->mti_rep_buf_nr,
-                               info->mti_rep_buf_size, NULL);
-        if (rc) {
-                RETURN(req->rq_status = rc);
+        req_capsule_extend(pill, mdt_it_flavor[opcode].it_fmt);
+
+        if (mdt_it_flavor[opcode].it_flags & HABEO_CORPUS) {
+                rc = mdt_habeo_corpus(info, mdt_it_flavor[opcode].it_flags);
+                if (rc != 0)
+                        RETURN(rc);
         }
 
-        rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
-        intent_set_disposition(rep, DISP_IT_EXECD);
+        rc = req_capsule_pack(pill);
+        if (rc != 0)
+                RETURN(rc);
 
+        rep = req_capsule_server_get(pill, &RMF_DLM_REP);
+        intent_set_disposition(rep, DISP_IT_EXECD);
 
         /* execute policy */
-        switch ((long)it->opc) {
-        case IT_OPEN:
-        case IT_OPEN|IT_CREAT:
-                rep->lock_policy_res2 = mdt_reint_internal(info, req, offset);
-                RETURN(ELDLM_LOCK_ABORTED);
-                break;
-        case IT_GETATTR:
-                gflags = MDS_INODELOCK_UPDATE;
-        case IT_LOOKUP:
-                gflags |= MDS_INODELOCK_LOOKUP;
-                //hmm.. something should be done with gflags..
-                rep->lock_policy_res2 = mdt_getattr_name(info, req, offset + 1);
-
-                break;
-        default:
-                CERROR("Unhandled intent till now "LPD64"\n", it->opc);
-                LBUG();
-        }
+        rep->lock_policy_res2 = mdt_it_flavor[opcode].it_act(info, req, offset);
 
         RETURN(ELDLM_OK);
 }
@@ -1792,11 +1835,18 @@ static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
         return l;
 }
 
+/*
+ * context key constructor/destructor
+ */
 
 static void *mdt_thread_init(const struct lu_context *ctx)
 {
         struct mdt_thread_info *info;
 
+        /*
+         * check that no high order allocations are incurred.
+         */
+        CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
         OBD_ALLOC_PTR(info);
         if (info != NULL)
                 info->mti_ctxt = ctx;
@@ -1867,48 +1917,61 @@ static void __exit mdt_mod_exit(void)
 }
 
 
-#define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
+#define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
         .mh_name    = #opc,                                             \
         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
         .mh_opc     = prefix ## _  ## opc,                              \
         .mh_flags   = flags,                                            \
-        .mh_act     = fn                                                \
+        .mh_act     = fn,                                               \
+        .mh_fmt     = fmt                                               \
 }
 
-#define DEF_MDT_HNDL(flags, name, fn)                   \
-        DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
+#define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
+        DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
+/*
+ * Request with a format known in advance
+ */
+#define DEF_MDT_HNDL_F(flags, name, fn)                                 \
+        DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
+/*
+ * Request with a format we do not yet know
+ */
+#define DEF_MDT_HNDL_0(flags, name, fn)                                 \
+        DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
 
 static struct mdt_handler mdt_mds_ops[] = {
-        DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
-        DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
-        DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
-        DEF_MDT_HNDL(HABEO_DISCUS, GETATTR,        mdt_getattr),
-        DEF_MDT_HNDL(HABEO_DISCUS, GETATTR_NAME,   mdt_getattr_name),
-        DEF_MDT_HNDL(HABEO_DISCUS, SETXATTR,       mdt_setxattr),
-        DEF_MDT_HNDL(HABEO_DISCUS, GETXATTR,       mdt_getxattr),
-        DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
-        DEF_MDT_HNDL(HABEO_DISCUS, READPAGE,       mdt_readpage),
-        DEF_MDT_HNDL(0,            REINT,          mdt_reint),
-        DEF_MDT_HNDL(HABEO_DISCUS, CLOSE,          mdt_close),
-        DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
-        DEF_MDT_HNDL(0,            PIN,            mdt_pin),
-        DEF_MDT_HNDL(HABEO_DISCUS, SYNC,           mdt_sync),
-        DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
-        DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
+DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
+DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
+DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
+DEF_MDT_HNDL_F(HABEO_DISCUS|HABEO_REFERO, GETATTR,      mdt_getattr),
+DEF_MDT_HNDL_F(HABEO_DISCUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
+DEF_MDT_HNDL_0(HABEO_DISCUS,              SETXATTR,     mdt_setxattr),
+DEF_MDT_HNDL_0(HABEO_DISCUS,              GETXATTR,     mdt_getxattr),
+DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
+DEF_MDT_HNDL_0(HABEO_DISCUS,              READPAGE,     mdt_readpage),
+DEF_MDT_HNDL_0(0,                         REINT,        mdt_reint),
+DEF_MDT_HNDL_0(HABEO_DISCUS,              CLOSE,        mdt_close),
+DEF_MDT_HNDL_0(HABEO_CORPUS,              DONE_WRITING, mdt_done_writing),
+DEF_MDT_HNDL_0(0,                         PIN,          mdt_pin),
+DEF_MDT_HNDL_0(HABEO_DISCUS,              SYNC,         mdt_sync),
+DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_handle_quotacheck),
+DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_handle_quotactl)
 };
 
 static struct mdt_handler mdt_obd_ops[] = {
 };
 
-#define DEF_DLM_HNDL(flags, name, fn)                   \
-        DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
+#define DEF_DLM_HNDL_0(flags, name, fn)                   \
+        DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
+#define DEF_DLM_HNDL_F(flags, name, fn)                   \
+        DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
 
 static struct mdt_handler mdt_dlm_ops[] = {
-        DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
-        DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
-        DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
-        DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
+        DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
+        DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
+        DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
+        DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
 };
 
 static struct mdt_handler mdt_llog_ops[] = {
index c6820cb..aff0ccc 100644 (file)
@@ -137,49 +137,57 @@ struct mdt_reint_record {
  * reduce stack consumption.
  */
 struct mdt_thread_info {
-        const struct lu_context *mti_ctxt;
-        struct mdt_device       *mti_mdt;
+        const struct lu_context   *mti_ctxt;
+        struct mdt_device         *mti_mdt;
         /*
          * number of buffers in reply message.
          */
-        int                      mti_rep_buf_nr;
+        int                        mti_rep_buf_nr;
         /*
          * sizes of reply buffers.
          */
-        int                      mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
+        int                        mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
         /*
          * Body for "habeo corpus" operations.
          */
-        struct mdt_body         *mti_body;
+        const struct mdt_body     *mti_body;
         /*
          * Lock request for "habeo clavis" operations.
          */
-        struct ldlm_request     *mti_dlm_req;
+        const struct ldlm_request *mti_dlm_req;
         /*
          * Host object. This is released at the end of mdt_handler().
          */
-        struct mdt_object       *mti_object;
+        struct mdt_object         *mti_object;
         /*
          * Object attributes.
          */
-        struct lu_attr           mti_attr;
+        struct lu_attr             mti_attr;
         /*
          * reint record. Containing information for reint operations.
          */
-        struct mdt_reint_record  mti_rr;
+        struct mdt_reint_record    mti_rr;
         /*
          * Additional fail id that can be set by handler. Passed to
          * target_send_reply().
          */
-        int                      mti_fail_id;
+        int                        mti_fail_id;
         /*
          * A couple of lock handles.
          */
-        struct mdt_lock_handle   mti_lh[MDT_LH_NR];
+        struct mdt_lock_handle     mti_lh[MDT_LH_NR];
         /*
          * for req-layout interface.
          */
-        struct req_capsule       mti_pill;
+        struct req_capsule         mti_pill;
+        /*
+         * buffer for mdt_statfs().
+         *
+         * XXX this is probably huge overkill, because statfs is not that
+         * frequent.
+         */
+        struct kstatfs             mti_sfs;
+
 };
 
 int fid_lock(struct ldlm_namespace *, const struct lu_fid *,
@@ -190,7 +198,7 @@ void fid_unlock(struct ldlm_namespace *, const struct lu_fid *,
                 struct lustre_handle *, ldlm_mode_t);
 
 struct mdt_object *mdt_object_find(const struct lu_context *,
-                                   struct mdt_device *, struct lu_fid *);
+                                   struct mdt_device *, const struct lu_fid *);
 void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *);
 
 int mdt_object_lock(struct ldlm_namespace *, struct mdt_object *,
index 6640657..1524ee3 100644 (file)
@@ -130,7 +130,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
                 mdt_object_put(info->mti_ctxt, o);
         } else
                 result = PTR_ERR(o);
-        
+
         RETURN(result);
 }
 
@@ -157,11 +157,12 @@ static int mdt_reint_create(struct mdt_thread_info *info)
                         rc = mdt_md_mkdir(info);
                 else
                         rc = mdt_md_mkobj(info);
-                
-                /* return fid to client. mti_body should point to 
+
+                /* return fid to client. mti_body should point to
                  * rep's body. */
-                info->mti_body->fid1 = *info->mti_rr.rr_fid2;
-                info->mti_body->valid |= OBD_MD_FLID;
+                /* XXX these fields are not used by the callers. */
+                /* info->mti_body->fid1 = *info->mti_rr.rr_fid2; */
+                /* info->mti_body->valid |= OBD_MD_FLID; */
                 break;
         }
         case S_IFLNK:{
@@ -226,9 +227,9 @@ static int mdt_reint_open(struct mdt_thread_info *info)
         if (result && result != -ENOENT) {
                 GOTO(out_parent, result);
         }
-        
+
         child = mdt_object_find(info->mti_ctxt, mdt, info->mti_rr.rr_fid2);
-        if (IS_ERR(child)) 
+        if (IS_ERR(child))
                 GOTO(out_parent, PTR_ERR(child));
 
        if (info->mti_rr.rr_flags & MDS_OPEN_CREAT) {
@@ -243,10 +244,10 @@ static int mdt_reint_open(struct mdt_thread_info *info)
                         result = -EEXIST;
                 }
         }
-        
+
         if (result == 0)
                 result = mdt_md_open(info, child);
-        
+
         mdt_object_put(info->mti_ctxt, child);
 
 out_parent:
index 1a65430..06ee214 100644 (file)
@@ -69,12 +69,54 @@ static const struct req_msg_field *mds_reint_create_client[] = {
         &RMF_NAME
 };
 
+static const struct req_msg_field *mds_connect_client[] = {
+        &RMF_TGTUUID,
+        &RMF_CLUUID,
+        &RMF_CONN,
+        &RMF_CONNECT_DATA
+};
+
+static const struct req_msg_field *mds_connect_server[] = {
+        &RMF_CONNECT_DATA
+};
+
+static const struct req_msg_field *ldlm_enqueue_client[] = {
+        &RMF_DLM_REQ
+};
+
+static const struct req_msg_field *ldlm_enqueue_server[] = {
+        &RMF_DLM_REP
+};
+
+static const struct req_msg_field *ldlm_intent_client[] = {
+        &RMF_DLM_REQ,
+        &RMF_LDLM_INTENT
+};
+
+static const struct req_msg_field *ldlm_intent_server[] = {
+        &RMF_DLM_REP,
+        &RMF_MDT_BODY,
+        &RMF_MDT_MD
+};
+
+static const struct req_msg_field *ldlm_intent_getattr_client[] = {
+        &RMF_DLM_REQ,
+        &RMF_LDLM_INTENT,
+        &RMF_MDT_BODY,     /* coincides with mds_getattr_name_client[] */
+        &RMF_NAME
+};
+
 static const struct req_format *req_formats[] = {
+        &RQF_MDS_CONNECT,
+        &RQF_MDS_DISCONNECT,
         &RQF_MDS_GETSTATUS,
         &RQF_MDS_STATFS,
         &RQF_MDS_GETATTR,
         &RQF_MDS_GETATTR_NAME,
-        &RQF_MDS_REINT_CREATE
+        &RQF_MDS_REINT_CREATE,
+        &RQF_LDLM_ENQUEUE,
+        &RQF_LDLM_INTENT,
+        &RQF_LDLM_INTENT_GETATTR
 };
 
 struct req_msg_field {
@@ -120,6 +162,48 @@ const struct req_msg_field RMF_REC_CREATE =
                     sizeof(struct mdt_rec_create), lustre_swab_mdt_rec_create);
 EXPORT_SYMBOL(RMF_REC_CREATE);
 
+const struct req_msg_field RMF_TGTUUID =
+        DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+EXPORT_SYMBOL(RMF_TGTUUID);
+
+const struct req_msg_field RMF_CLUUID =
+        DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+EXPORT_SYMBOL(RMF_CLUUID);
+
+/*
+ * connection handle received in MDS_CONNECT request.
+ *
+ * XXX no swabbing?
+ */
+const struct req_msg_field RMF_CONN =
+        DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL);
+EXPORT_SYMBOL(RMF_CONN);
+
+const struct req_msg_field RMF_CONNECT_DATA =
+        DEFINE_MSGF("cdata", 0,
+                    sizeof(struct obd_connect_data), lustre_swab_connect);
+EXPORT_SYMBOL(RMF_CONNECT_DATA);
+
+const struct req_msg_field RMF_DLM_REQ =
+        DEFINE_MSGF("dlm_req", 0,
+                    sizeof(struct ldlm_request), lustre_swab_ldlm_request);
+EXPORT_SYMBOL(RMF_DLM_REQ);
+
+const struct req_msg_field RMF_DLM_REP =
+        DEFINE_MSGF("dlm_rep", 0,
+                    sizeof(struct ldlm_reply), lustre_swab_ldlm_reply);
+EXPORT_SYMBOL(RMF_DLM_REP);
+
+const struct req_msg_field RMF_LDLM_INTENT =
+        DEFINE_MSGF("ldlm_intent", 0,
+                    sizeof(struct ldlm_intent), lustre_swab_ldlm_intent);
+EXPORT_SYMBOL(RMF_LDLM_INTENT);
+
+const struct req_msg_field RMF_MDT_MD =
+        DEFINE_MSGF("mdt_md",
+                    0, sizeof(struct lov_mds_md) /* FIXME: See mds */, NULL);
+EXPORT_SYMBOL(RMF_MDT_MD);
+
 /*
  * Request formats.
  */
@@ -172,6 +256,30 @@ const struct req_format RQF_MDS_REINT_CREATE =
                         mds_reint_create_client, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_REINT_CREATE);
 
+const struct req_format RQF_MDS_CONNECT =
+        DEFINE_REQ_FMT0("MDS_CONNECT",
+                        mds_connect_client, mds_connect_server);
+EXPORT_SYMBOL(RQF_MDS_CONNECT);
+
+const struct req_format RQF_MDS_DISCONNECT =
+        DEFINE_REQ_FMT0("MDS_DISCONNECT", empty, empty);
+EXPORT_SYMBOL(RQF_MDS_DISCONNECT);
+
+const struct req_format RQF_LDLM_ENQUEUE =
+        DEFINE_REQ_FMT0("LDLM_ENQUEUE",
+                        ldlm_enqueue_client, ldlm_enqueue_server);
+EXPORT_SYMBOL(RQF_LDLM_ENQUEUE);
+
+const struct req_format RQF_LDLM_INTENT =
+        DEFINE_REQ_FMT0("LDLM_INTENT",
+                        ldlm_intent_client, ldlm_intent_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT);
+
+const struct req_format RQF_LDLM_INTENT_GETATTR =
+        DEFINE_REQ_FMT0("LDLM_INTENT",
+                        ldlm_intent_getattr_client, ldlm_intent_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT_GETATTR);
+
 int req_layout_init(void)
 {
         int i;
@@ -207,13 +315,15 @@ void req_layout_fini(void)
 EXPORT_SYMBOL(req_layout_fini);
 
 void req_capsule_init(struct req_capsule *pill,
-                      struct ptlrpc_request *req, enum req_location location)
+                      struct ptlrpc_request *req, enum req_location location,
+                      int *area)
 {
         LASSERT(location == RCL_SERVER || location == RCL_CLIENT);
 
         memset(pill, 0, sizeof *pill);
         pill->rc_req = req;
         pill->rc_loc = location;
+        pill->rc_area = area;
 }
 EXPORT_SYMBOL(req_capsule_init);
 
@@ -229,54 +339,61 @@ static int __req_format_is_sane(const struct req_format *fmt)
                 req_formats[fmt->rf_idx] == fmt;
 }
 
-static void __req_capsule_init(struct req_capsule *pill,
-                               const struct req_format *fmt,
-                               enum req_location loc)
+void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt)
 {
-        LASSERT(0 <= loc && loc < ARRAY_SIZE(pill->rc_fmt));
-        LASSERT(pill->rc_fmt[loc] == NULL);
-        LASSERT(ergo(loc != pill->rc_loc, pill->rc_swabbed == 0));
+        LASSERT(pill->rc_fmt == NULL);
         LASSERT(__req_format_is_sane(fmt));
 
-        pill->rc_fmt[loc] = fmt;
-}
-
-void req_capsule_server_init(struct req_capsule *pill,
-                             const struct req_format *fmt)
-{
-        __req_capsule_init(pill, fmt, RCL_SERVER);
+        pill->rc_fmt = fmt;
 }
-EXPORT_SYMBOL(req_capsule_server_init);
+EXPORT_SYMBOL(req_capsule_set);
 
-void req_capsule_client_init(struct req_capsule *pill,
-                             const struct req_format *fmt)
-{
-        __req_capsule_init(pill, fmt, RCL_CLIENT);
-}
-EXPORT_SYMBOL(req_capsule_client_init);
-
-int req_capsule_start(struct req_capsule *pill,
-                      const struct req_format *fmt, int *area)
+int req_capsule_pack(struct req_capsule *pill)
 {
         int i;
         int nr;
         int result;
+        int total;
+
+        const struct req_format *fmt;
 
         LASSERT(pill->rc_loc == RCL_SERVER);
+        fmt = pill->rc_fmt;
+        LASSERT(fmt != NULL);
 
         nr = fmt->rf_fields[RCL_SERVER].nr;
-        for (i = 0; i < nr; ++i)
-                area[i] = fmt->rf_fields[RCL_SERVER].d[i]->rmf_size;
-        req_capsule_server_init(pill, fmt);
-        result = lustre_pack_reply(pill->rc_req, nr, area, NULL);
+        for (total = 0, i = 0; i < nr; ++i) {
+                int *size;
+
+                size = &pill->rc_area[i];
+                if (*size == 0) {
+                        *size = fmt->rf_fields[RCL_SERVER].d[i]->rmf_size;
+                        LASSERT(*size != 0);
+                }
+                total += *size;
+        }
+        result = lustre_pack_reply(pill->rc_req, nr, pill->rc_area, NULL);
         if (result != 0) {
                 DEBUG_REQ(D_ERROR, pill->rc_req,
-                          "Failed to pack %d fields in format `%s': ",
-                          nr, fmt->rf_name);
+                          "Cannot pack %d fields (%d bytes) in format `%s': ",
+                          nr, total, fmt->rf_name);
         }
         return result;
 }
-EXPORT_SYMBOL(req_capsule_start);
+EXPORT_SYMBOL(req_capsule_pack);
+
+static int __req_capsule_offset(const struct req_capsule *pill,
+                                const struct req_msg_field *field,
+                                enum req_location loc)
+{
+        int offset;
+
+        offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc];
+        LASSERT(offset > 0);
+        offset --;
+        LASSERT(0 <= offset && offset < (sizeof(pill->rc_swabbed) << 3));
+        return offset;
+}
 
 static void *__req_capsule_get(const struct req_capsule *pill,
                                const struct req_msg_field *field,
@@ -295,17 +412,11 @@ static void *__req_capsule_get(const struct req_capsule *pill,
                 [RCL_SERVER] = "server"
         };
 
-        LASSERT(0 <= loc && loc < ARRAY_SIZE(pill->rc_fmt));
-        CLASSERT(ARRAY_SIZE(pill->rc_fmt) == ARRAY_SIZE(field->rmf_offset[0]));
-
-        fmt = pill->rc_fmt[loc];
+        fmt = pill->rc_fmt;
         LASSERT(fmt != NULL);
         LASSERT(__req_format_is_sane(fmt));
 
-        offset = field->rmf_offset[fmt->rf_idx][loc];
-        LASSERT(offset > 0);
-        -- offset;
-        LASSERT(0 <= offset && offset < (sizeof(pill->rc_swabbed) << 3));
+        offset = __req_capsule_offset(pill, field, loc);
 
         req = pill->rc_req;
         msg = loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg;
@@ -329,10 +440,10 @@ static void *__req_capsule_get(const struct req_capsule *pill,
         return value;
 }
 
-const void *req_capsule_client_get(const struct req_capsule *pill,
-                                   const struct req_msg_field *field)
+void *req_capsule_client_get(const struct req_capsule *pill,
+                             const struct req_msg_field *field)
 {
-        return (const void *)__req_capsule_get(pill, field, RCL_CLIENT);
+        return __req_capsule_get(pill, field, RCL_CLIENT);
 }
 EXPORT_SYMBOL(req_capsule_client_get);
 
@@ -343,3 +454,47 @@ void *req_capsule_server_get(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_server_get);
 
+const void *req_capsule_other_get(const struct req_capsule *pill,
+                                  const struct req_msg_field *field)
+{
+        return __req_capsule_get(pill, field, pill->rc_loc ^ 1);
+}
+EXPORT_SYMBOL(req_capsule_other_get);
+
+void req_capsule_set_size(const struct req_capsule *pill,
+                          const struct req_msg_field *field,
+                          enum req_location loc, int size)
+{
+        pill->rc_area[__req_capsule_offset(pill, field, loc)] = size;
+}
+EXPORT_SYMBOL(req_capsule_set_size);
+
+void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
+{
+        int i;
+        int j;
+
+        const struct req_format *old;
+
+        LASSERT(pill->rc_fmt != NULL);
+        LASSERT(__req_format_is_sane(fmt));
+
+        old = pill->rc_fmt;
+        /*
+         * Sanity checking...
+         */
+        for (i = 0; i < RCL_NR; ++i) {
+                LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr);
+                for (j = 0; j < old->rf_fields[i].nr - 1; ++j) {
+                        LASSERT(fmt->rf_fields[i].d[j]->rmf_size ==
+                                old->rf_fields[i].d[j]->rmf_size);
+                }
+                /*
+                 * Last field in old format can be shorter than in new.
+                 */
+                LASSERT(fmt->rf_fields[i].d[j]->rmf_size >=
+                        old->rf_fields[i].d[j]->rmf_size);
+        }
+        pill->rc_fmt = fmt;
+}
+EXPORT_SYMBOL(req_capsule_extend);