Whamcloud - gitweb
more req-layout cleanups (for intents and reints)
authornikita <nikita>
Thu, 25 May 2006 15:49:05 +0000 (15:49 +0000)
committernikita <nikita>
Thu, 25 May 2006 15:49:05 +0000 (15:49 +0000)
lustre/include/lustre_req_layout.h
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/ptlrpc/layout.c

index 8774a06..046b817 100644 (file)
@@ -71,6 +71,9 @@ void req_capsule_set_size(const struct req_capsule *pill,
                           enum req_location loc, int size);
 void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt);
 
+int req_capsule_has_field(const struct req_capsule *pill,
+                          const struct req_msg_field *field);
+
 int  req_layout_init(void);
 void req_layout_fini(void);
 
@@ -84,15 +87,20 @@ extern const struct req_format RQF_MDS_DISCONNECT;
  * This is format of direct (non-intent) MDS_GETATTR_NAME request.
  */
 extern const struct req_format RQF_MDS_GETATTR_NAME;
+extern const struct req_format RQF_MDS_REINT;
 extern const struct req_format RQF_MDS_REINT_CREATE;
+extern const struct req_format RQF_MDS_REINT_UNLINK;
+extern const struct req_format RQF_MDS_REINT_SETATTR;
 extern const struct req_format RQF_LDLM_ENQUEUE;
 extern const struct req_format RQF_LDLM_INTENT;
 extern const struct req_format RQF_LDLM_INTENT_GETATTR;
+extern const struct req_format RQF_LDLM_INTENT_OPEN;
+extern const struct req_format RQF_LDLM_INTENT_CREATE;
+extern const struct req_format RQF_LDLM_INTENT_UNLINK;
 
 extern const struct req_msg_field RMF_MDT_BODY;
 extern const struct req_msg_field RMF_OBD_STATFS;
 extern const struct req_msg_field RMF_NAME;
-extern const struct req_msg_field RMF_REC_CREATE;
 extern const struct req_msg_field RMF_TGTUUID;
 extern const struct req_msg_field RMF_CLUUID;
 /*
@@ -104,5 +112,13 @@ extern const struct req_msg_field RMF_DLM_REQ;
 extern const struct req_msg_field RMF_DLM_REP;
 extern const struct req_msg_field RMF_LDLM_INTENT;
 extern const struct req_msg_field RMF_MDT_MD;
+extern const struct req_msg_field RMF_REC_CREATE;
+extern const struct req_msg_field RMF_REC_LINK;
+extern const struct req_msg_field RMF_REC_UNLINK;
+extern const struct req_msg_field RMF_REC_RENAME;
+extern const struct req_msg_field RMF_REC_SETATTR;
+extern const struct req_msg_field RMF_EADATA;
+extern const struct req_msg_field RMF_LOGCOOKIES;
+extern const struct req_msg_field RMF_REINT_OPC;
 
 #endif /* _LUSTRE_REQ_LAYOUT_H__ */
index 18d7395..4027eb2 100644 (file)
@@ -1095,7 +1095,7 @@ static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
 
         LASSERT(fid != NULL);
         LASSERT(hint != NULL);
-        LASSERT(fid_seq(&cli->cl_fid));
+        LASSERT(fid_seq_is_sane(fid_seq(&cli->cl_fid)));
 
         spin_lock(&cli->cl_fid_lock);
         if (fid_oid(&cli->cl_fid) < LUSTRE_FID_SEQ_WIDTH) {
index 454fbee..820a668 100644 (file)
  */
 unsigned long mdt_num_threads;
 
-static int                  mdt_handle    (struct ptlrpc_request *req);
-static struct mdt_device   *mdt_dev       (struct lu_device *d);
-static const struct lu_fid *mdt_object_fid(struct mdt_object *o);
+static int                    mdt_handle    (struct ptlrpc_request *req);
+static struct mdt_device     *mdt_dev       (struct lu_device *d);
+static const struct lu_fid   *mdt_object_fid(struct mdt_object *o);
+static struct ptlrpc_request *mdt_info_req  (struct mdt_thread_info *info);
 
 static struct lu_context_key       mdt_thread_key;
 static struct lu_object_operations mdt_obj_ops;
 
-static int mdt_getstatus(struct mdt_thread_info *info,
-                         struct ptlrpc_request *req, int offset)
+static int mdt_getstatus(struct mdt_thread_info *info)
 {
         struct md_device *next  = info->mti_mdt->mdt_child;
         int               result;
@@ -93,8 +93,7 @@ static int mdt_getstatus(struct mdt_thread_info *info,
         RETURN(result);
 }
 
-static int mdt_statfs(struct mdt_thread_info *info,
-                      struct ptlrpc_request *req, int offset)
+static int mdt_statfs(struct mdt_thread_info *info)
 {
         struct md_device  *next  = info->mti_mdt->mdt_child;
         struct obd_statfs *osfs;
@@ -138,8 +137,7 @@ static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
         b->nlink      = attr->la_nlink;
 }
 
-static int mdt_getattr(struct mdt_thread_info *info,
-                       struct ptlrpc_request *req, int offset)
+static int mdt_getattr(struct mdt_thread_info *info)
 {
         int              result;
         struct mdt_body *body;
@@ -167,8 +165,7 @@ static int mdt_getattr(struct mdt_thread_info *info,
         RETURN(result);
 }
 
-static int mdt_getattr_name(struct mdt_thread_info *info,
-                            struct ptlrpc_request *req, int offset)
+static int mdt_getattr_name(struct mdt_thread_info *info)
 {
         struct md_object  *next = mdt_object_child(info->mti_object);
         struct mdt_object *child;
@@ -189,9 +186,7 @@ static int mdt_getattr_name(struct mdt_thread_info *info,
         if (result == 0) {
                 child = mdt_object_find(info->mti_ctxt, info->mti_mdt,
                                         &body->fid1);
-                if (IS_ERR(child)) {
-                        result = PTR_ERR(child);
-                } else {
+                if (!IS_ERR(child)) {
                         result = mo_attr_get(info->mti_ctxt, next,
                                              &info->mti_attr);
                         if (result == 0) {
@@ -199,7 +194,8 @@ static int mdt_getattr_name(struct mdt_thread_info *info,
                                 body->valid |= OBD_MD_FLID;
                         }
                         mdt_object_put(info->mti_ctxt, child);
-                }
+                } else
+                        result = PTR_ERR(child);
         }
 
         RETURN(result);
@@ -221,11 +217,19 @@ static struct mdt_device *mdt_dev(struct lu_device *d)
         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
 }
 
-static int mdt_connect(struct mdt_thread_info *info,
-                       struct ptlrpc_request *req, int offset)
+static struct ptlrpc_request *mdt_info_req(struct mdt_thread_info *info)
+{
+        return info->mti_pill.rc_req;
+}
+
+static int mdt_connect(struct mdt_thread_info *info)
 {
         int result;
+        struct req_capsule *pill;
+        struct ptlrpc_request *req;
 
+        pill = &info->mti_pill;
+        req = mdt_info_req(info);
         result = target_handle_connect(req, mdt_handle);
         if (result == 0) {
                 struct obd_connect_data *data;
@@ -233,7 +237,13 @@ static int mdt_connect(struct mdt_thread_info *info,
                 LASSERT(req->rq_export != NULL);
                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
 
-                data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
+                /*
+                 * XXX: this is incorrect, because target_handle_connect()
+                 * accessed and *swabbed* connect data bypassing
+                 * capsule. Correct fix is to switch everything to the new
+                 * req-layout interface.
+                 */
+                data = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
 
                 result = seq_mgr_alloc(info->mti_ctxt,
                                        info->mti_mdt->mdt_seq_mgr,
@@ -242,73 +252,82 @@ static int mdt_connect(struct mdt_thread_info *info,
         return result;
 }
 
-static int mdt_disconnect(struct mdt_thread_info *info,
-                          struct ptlrpc_request *req, int offset)
+static int mdt_disconnect(struct mdt_thread_info *info)
 {
-        return target_handle_disconnect(req);
+        return target_handle_disconnect(mdt_info_req(info));
 }
 
-static int mdt_setxattr(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
+static int mdt_setxattr(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_getxattr(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
+static int mdt_getxattr(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_readpage(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
+static int mdt_readpage(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_reint_internal(struct mdt_thread_info *info,
-                              struct ptlrpc_request *req,
-                              int offset)
+static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
 {
-        int rep_off, rc;
+        int rc;
 
-        rc = mdt_reint_unpack(info, req, offset);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
-                CERROR("invalid record\n");
-                RETURN(rc = -EINVAL);
-        }
+        ENTRY;
 
-        /* XXX: this should be cleanup up */
-        rep_off = offset == MDS_REQ_INTENT_REC_OFF ? 1 : 0;
+        OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_UNPACK, -EFAULT);
+
+        rc = mdt_reint_unpack(info, op);
+        if (rc == 0)
+                rc = mdt_reint_rec(info);
 
-        /* init body for handlers by rep's body, they may want to
-         * store there something. */
-        info->mti_body = lustre_msg_buf(req->rq_repmsg, rep_off,
-                                        sizeof(struct mdt_body));
-        rc = mdt_reint_rec(info);
         RETURN(rc);
 }
 
-static int mdt_reint(struct mdt_thread_info *info,
-                     struct ptlrpc_request *req, int offset)
+static long mdt_reint_opcode(struct mdt_thread_info *info,
+                             const struct req_format **fmt)
 {
-        __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
-                                     sizeof (*opcp));
-        __u32  opc;
-        int rc;
+        __u32 *ptr;
+        long opc;
 
-        ENTRY;
-
-        /* NB only peek inside req now; mdt_XXX_unpack() will swab it */
-        if (opcp == NULL) {
-                CERROR("Can't inspect opcode\n");
-                RETURN(-EINVAL);
+        opc = -EINVAL;
+        ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
+        if (ptr != NULL) {
+                opc = *ptr;
+                DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
+                if (opc < REINT_MAX && fmt[opc] != NULL)
+                        req_capsule_extend(&info->mti_pill, fmt[opc]);
+                else
+                        CERROR("Unsupported opc: %ld\n", opc);
         }
-        opc = *opcp;
-        if (lustre_msg_swabbed(req->rq_reqmsg))
-                __swab32s(&opc);
+        return opc;
+}
 
-        DEBUG_REQ(D_INODE, req, "reint opt = %d", opc);
+static int mdt_reint(struct mdt_thread_info *info)
+{
+        long opc;
+        int  rc;
+
+        struct ptlrpc_request *req;
+
+        static const struct req_format *reint_fmts[REINT_MAX] = {
+                [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
+                [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
+                [REINT_LINK]    = NULL, /* XXX not yet */
+                [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
+                [REINT_RENAME]  = NULL, /* XXX not yet */
+                [REINT_OPEN]    = NULL /* XXX not yet */
+        };
+
+        ENTRY;
+
+        req = mdt_info_req(info);
+        opc = mdt_reint_opcode(info, reint_fmts);
+        if (opc < 0)
+                RETURN(opc);
 
         OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
 
@@ -326,45 +345,36 @@ static int mdt_reint(struct mdt_thread_info *info,
         if (rc)
                 RETURN(rc);
 
-        /* init body by rep body, needed by handlers to return data to client */
-        info->mti_body = lustre_msg_buf(req->rq_repmsg, 0,
-                                        sizeof(struct mdt_body));
-        rc = mdt_reint_internal(info, req, offset);
+        rc = mdt_reint_internal(info, opc);
         RETURN(rc);
 }
 
-static int mdt_close(struct mdt_thread_info *info,
-                     struct ptlrpc_request *req, int offset)
+static int mdt_close(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_done_writing(struct mdt_thread_info *info,
-                            struct ptlrpc_request *req, int offset)
+static int mdt_done_writing(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_pin(struct mdt_thread_info *info,
-                   struct ptlrpc_request *req, int offset)
+static int mdt_pin(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_sync(struct mdt_thread_info *info,
-                    struct ptlrpc_request *req, int offset)
+static int mdt_sync(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_handle_quotacheck(struct mdt_thread_info *info,
-                                 struct ptlrpc_request *req, int offset)
+static int mdt_handle_quotacheck(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_handle_quotactl(struct mdt_thread_info *info,
-                               struct ptlrpc_request *req, int offset)
+static int mdt_handle_quotactl(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
@@ -373,23 +383,20 @@ static int mdt_handle_quotactl(struct mdt_thread_info *info,
  * OBD PING and other handlers.
  */
 
-static int mdt_obd_ping(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
+static int mdt_obd_ping(struct mdt_thread_info *info)
 {
         int result;
         ENTRY;
-        result = target_handle_ping(req);
+        result = target_handle_ping(mdt_info_req(info));
         RETURN(result);
 }
 
-static int mdt_obd_log_cancel(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
+static int mdt_obd_log_cancel(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
 
-static int mdt_obd_qc_callback(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
+static int mdt_obd_qc_callback(struct mdt_thread_info *info)
 {
         return -EOPNOTSUPP;
 }
@@ -405,8 +412,7 @@ static struct ldlm_callback_suite cbs = {
         .lcs_glimpse    = NULL
 };
 
-static int mdt_enqueue(struct mdt_thread_info *info,
-                       struct ptlrpc_request *req, int offset)
+static int mdt_enqueue(struct mdt_thread_info *info)
 {
         /*
          * info->mti_dlm_req already contains swapped and (if necessary)
@@ -416,26 +422,24 @@ static int mdt_enqueue(struct mdt_thread_info *info,
 
         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
         return ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
-                                    req, info->mti_dlm_req, &cbs);
+                                    mdt_info_req(info),
+                                    info->mti_dlm_req, &cbs);
 }
 
-static int mdt_convert(struct mdt_thread_info *info,
-                       struct ptlrpc_request *req, int offset)
+static int mdt_convert(struct mdt_thread_info *info)
 {
         LASSERT(info->mti_dlm_req);
-        return ldlm_handle_convert0(req, info->mti_dlm_req);
+        return ldlm_handle_convert0(mdt_info_req(info), info->mti_dlm_req);
 }
 
-static int mdt_bl_callback(struct mdt_thread_info *info,
-                           struct ptlrpc_request *req, int offset)
+static int mdt_bl_callback(struct mdt_thread_info *info)
 {
         CERROR("bl callbacks should not happen on MDS\n");
         LBUG();
         return -EOPNOTSUPP;
 }
 
-static int mdt_cp_callback(struct mdt_thread_info *info,
-                           struct ptlrpc_request *req, int offset)
+static int mdt_cp_callback(struct mdt_thread_info *info)
 {
         CERROR("cp callbacks should not happen on MDS\n");
         LBUG();
@@ -582,15 +586,15 @@ struct mdt_handler {
         int         mh_fail_id;
         __u32       mh_opc;
         __u32       mh_flags;
-        int (*mh_act)(struct mdt_thread_info *info,
-                      struct ptlrpc_request *req, int offset);
+        int (*mh_act)(struct mdt_thread_info *info);
 
         const struct req_format *mh_fmt;
 };
 
 enum mdt_handler_flags {
         /*
-         * struct mdt_body is passed in the incoming message.
+         * struct mdt_body is passed in the incoming message, and object
+         * identified by this fid exists on disk.
          */
         HABEO_CORPUS = (1 << 0),
         /*
@@ -598,15 +602,10 @@ enum mdt_handler_flags {
          */
         HABEO_CLAVIS = (1 << 1),
         /*
-         * object, identified by fid passed in mdt_body, already exists (on
-         * disk).
-         */
-        HABEO_DISCUS = HABEO_CORPUS | (1 << 2),
-        /*
          * this request has fixed reply format, so that reply message can be
          * packed by generic code.
          */
-        HABEO_REFERO = (1 << 3)
+        HABEO_REFERO = (1 << 2)
 };
 
 struct mdt_opc_slice {
@@ -654,37 +653,23 @@ static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
         return 0;
 }
 
-/*
- * Generic code handling requests that have struct mdt_body passed in:
- *
- *  - extract mdt_body from request and save it in @info;
- *
- *  - create lu_object, corresponding to the fid in mdt_body, and save it in
- *  @info;
- *
- *  - if HABEO_DISCUS flag is set for this request type in addition to
- *  HABEO_CORPUS, check whether object actually exists on storage
- *  (lu_object_exists()).
- *
- */
-static int mdt_habeo_corpus(struct mdt_thread_info *info, __u32 flags)
+static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
 {
-        const struct mdt_body   *body;
-        struct mdt_object       *obj;
-        const struct lu_context *ctx;
-
         int result;
+        const struct mdt_body    *body;
+        struct mdt_object        *obj;
+        const struct lu_context  *ctx;
+        const struct req_capsule *pill;
 
         ctx = info->mti_ctxt;
+        pill = &info->mti_pill;
 
-        body = info->mti_body = req_capsule_client_get(&info->mti_pill,
-                                                       &RMF_MDT_BODY);
+        body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
         if (body != NULL) {
                 if (fid_is_sane(&body->fid1)) {
                         obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
-
                         if (!IS_ERR(obj)) {
-                                if ((flags & HABEO_DISCUS) == HABEO_DISCUS &&
+                                if ((flags & HABEO_CORPUS) &&
                                     !lu_object_exists(ctx,
                                                       &obj->mot_obj.mo_lu)) {
                                         mdt_object_put(ctx, obj);
@@ -707,17 +692,45 @@ static int mdt_habeo_corpus(struct mdt_thread_info *info, __u32 flags)
 }
 
 /*
+ * Generic code handling requests that have struct mdt_body passed in:
+ *
+ *  - extract mdt_body from request and save it in @info, if present;
+ *
+ *  - create lu_object, corresponding to the fid in mdt_body, and save it in
+ *  @info;
+ *
+ *  - if HABEO_CORPUS flag is set for this request type check whether object
+ *  actually exists on storage (lu_object_exists()).
+ *
+ */
+static int mdt_req_unpack(struct mdt_thread_info *info, __u32 flags)
+{
+        struct req_capsule *pill;
+
+        int result;
+
+        pill = &info->mti_pill;
+
+        if (req_capsule_has_field(pill, &RMF_MDT_BODY))
+                result = mdt_body_unpack(info, flags);
+        else
+                result = 0;
+
+        if (result == 0 && (flags & HABEO_REFERO))
+                result = req_capsule_pack(pill);
+
+        return result;
+}
+
+/*
  * Invoke handler for this request opc. Also do necessary preprocessing
  * (according to handler ->mh_flags), and post-processing (setting of
  * ->last_{xid,committed}).
  */
 static int mdt_req_handle(struct mdt_thread_info *info,
-                          struct mdt_handler *h, struct ptlrpc_request *req,
-                          int shift)
+                          struct mdt_handler *h, struct ptlrpc_request *req)
 {
-        int result;
-        int off;
-
+        int   result;
         __u32 flags;
 
         ENTRY;
@@ -731,8 +744,6 @@ static int mdt_req_handle(struct mdt_thread_info *info,
         if (h->mh_fail_id != 0)
                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
 
-        off = MDS_REQ_REC_OFF + shift;
-
         result = 0;
         flags = h->mh_flags;
         LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL));
@@ -742,17 +753,13 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
         if (h->mh_fmt != NULL) {
                 req_capsule_set(&info->mti_pill, h->mh_fmt);
-                if (flags & HABEO_REFERO)
-                        result = req_capsule_pack(&info->mti_pill);
+                if (result == 0)
+                        result = mdt_req_unpack(info, flags);
         }
 
-        if (result == 0 && flags & HABEO_CORPUS)
-                result = mdt_habeo_corpus(info, flags);
-
         if (result == 0 && flags & HABEO_CLAVIS) {
                 struct ldlm_request *dlm;
 
-                LASSERT(shift == 0);
                 LASSERT(h->mh_fmt != NULL);
 
                 dlm = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
@@ -771,7 +778,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                 /*
                  * Process request.
                  */
-                result = h->mh_act(info, req, off);
+                result = h->mh_act(info);
         /*
          * XXX result value is unconditionally shoved into ->rq_status
          * (original code sometimes placed error code into ->rq_status, and
@@ -1035,7 +1042,7 @@ static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
                 case +1:
                         h = mdt_handler_find(msg->opc);
                         if (h != NULL)
-                                result = mdt_req_handle(info, h, req, 0);
+                                result = mdt_req_handle(info, h, req);
                         else {
                                 req->rq_status = -ENOTSUPP;
                                 result = ptlrpc_error(req);
@@ -1108,25 +1115,102 @@ enum mdt_it_code {
         MDT_IT_NR
 };
 
-static struct {
+static int mdt_intent_getattr(enum mdt_it_code opcode,
+                              struct mdt_thread_info *info);
+static int mdt_intent_reint(enum mdt_it_code opcode,
+                            struct mdt_thread_info *info);
+
+static struct mdt_it_flavor {
         const struct req_format *it_fmt;
         __u32                    it_flags;
-        int                    (*it_act)(struct mdt_thread_info *,
-                                         struct ptlrpc_request *, int);
+        int                    (*it_act)(enum mdt_it_code ,
+                                         struct mdt_thread_info *);
+        long                     it_reint;
 } mdt_it_flavor[] = {
-        [MDT_IT_OPEN]     = { NULL,                     0, mdt_reint_internal },
-        [MDT_IT_OCREAT]   = { NULL,                     0, mdt_reint_internal },
-        [MDT_IT_CREATE]   = { NULL,                     0, NULL },
-        [MDT_IT_GETATTR]  = { &RQF_LDLM_INTENT_GETATTR,
-                              HABEO_DISCUS, mdt_getattr_name },
-        [MDT_IT_READDIR]  = { NULL,                     0, NULL },
-        [MDT_IT_LOOKUP]   = { &RQF_LDLM_INTENT_GETATTR,
-                              HABEO_DISCUS, mdt_getattr_name },
-        [MDT_IT_UNLINK]   = { NULL,                     0, NULL },
-        [MDT_IT_TRUNC]    = { NULL,                     0, NULL },
-        [MDT_IT_GETXATTR] = { NULL,                     0, NULL }
+        [MDT_IT_OPEN]     = {
+                .it_fmt   = NULL,
+                .it_flags = HABEO_REFERO,
+                .it_act   = mdt_intent_reint,
+                .it_reint = REINT_OPEN
+        },
+        [MDT_IT_OCREAT]   = {
+                .it_fmt   = NULL,
+                .it_flags = HABEO_REFERO,
+                .it_act   = mdt_intent_reint,
+                .it_reint = REINT_OPEN
+        },
+        [MDT_IT_CREATE]   = {
+                .it_fmt   = NULL,
+                .it_flags = HABEO_REFERO,
+                .it_act   = mdt_intent_reint,
+                .it_reint = REINT_CREATE
+        },
+        [MDT_IT_GETATTR]  = {
+                .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
+                .it_flags = HABEO_CORPUS|HABEO_REFERO,
+                .it_act   = mdt_intent_getattr
+        },
+        [MDT_IT_READDIR]  = {
+                .it_fmt   = NULL,
+                .it_flags = HABEO_REFERO,
+                .it_act   = NULL
+        },
+        [MDT_IT_LOOKUP]   = {
+                .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
+                .it_flags = HABEO_CORPUS|HABEO_REFERO,
+                .it_act   = mdt_intent_getattr
+        },
+        [MDT_IT_UNLINK]   = {
+                .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
+                .it_flags = HABEO_REFERO,
+                .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
+                .it_reint = REINT_UNLINK
+        },
+        [MDT_IT_TRUNC]    = {
+                .it_fmt   = NULL,
+                .it_flags = HABEO_REFERO,
+                .it_act   = NULL
+        },
+        [MDT_IT_GETXATTR] = {
+                .it_fmt   = NULL,
+                .it_flags = HABEO_REFERO,
+                .it_act   = NULL
+        }
 };
 
+static int mdt_intent_getattr(enum mdt_it_code opcode,
+                              struct mdt_thread_info *info)
+{
+        return mdt_getattr_name(info);
+}
+
+static int mdt_intent_reint(enum mdt_it_code opcode,
+                            struct mdt_thread_info *info)
+{
+        long opc;
+        int  rc;
+
+        static const struct req_format *intent_fmts[REINT_MAX] = {
+                [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
+                [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
+        };
+
+        ENTRY;
+
+        opc = mdt_reint_opcode(info, intent_fmts);
+        if (opc >= 0) {
+                if (mdt_it_flavor[opcode].it_reint == opc)
+                        rc = mdt_reint_internal(info, opc);
+                else {
+                        CERROR("Reint code %ld doesn't match intent: %d\n",
+                               opc, opcode);
+                        rc = -EPROTO;
+                }
+        } else
+                rc = opc;
+        RETURN(rc);
+}
+
 static int mdt_intent_code(long itcode)
 {
         int result;
@@ -1167,20 +1251,55 @@ static int mdt_intent_code(long itcode)
         return result;
 }
 
+static int mdt_intent_opc(long itopc, struct mdt_thread_info *info)
+{
+        int opc;
+        int rc;
+
+        struct req_capsule   *pill;
+        struct mdt_it_flavor *flv;
+
+        ENTRY;
+
+        opc = mdt_intent_code(itopc);
+        if (opc >= 0) {
+                pill = &info->mti_pill;
+                flv  = &mdt_it_flavor[opc];
+
+                if (flv->it_fmt != NULL)
+                        req_capsule_extend(pill, flv->it_fmt);
+
+                rc = mdt_req_unpack(info, flv->it_flags);
+                if (rc == 0) {
+                        struct ldlm_reply *rep;
+
+                        rep = req_capsule_server_get(pill, &RMF_DLM_REP);
+                        if (rep != NULL) {
+                                /* execute policy */
+                                rep->lock_policy_res2 = flv->it_act(opc, info);
+                                intent_set_disposition(rep, DISP_IT_EXECD);
+                                rc = 0;
+                        } else
+                                rc = -EFAULT;
+                }
+        } else
+                rc = -EINVAL;
+
+        RETURN(rc);
+}
+
 static int mdt_intent_policy(struct ldlm_namespace *ns,
                              struct ldlm_lock **lockp, void *req_cookie,
                              ldlm_mode_t mode, int flags, void *data)
 {
-        struct ptlrpc_request *req = req_cookie;
-        struct ldlm_lock *lock = *lockp;
-        struct ldlm_intent *it;
-        struct ldlm_reply *rep;
-        int offset = MDS_REQ_INTENT_REC_OFF;
+        struct mdt_thread_info *info;
+        struct ptlrpc_request  *req  =  req_cookie;
+        struct ldlm_intent     *it;
+        struct req_capsule     *pill;
+        struct ldlm_lock       *lock = *lockp;
+
         int rc;
         int gflags = 0;
-        struct mdt_thread_info *info;
-        struct req_capsule *pill;
-        int opcode;
         ENTRY;
 
         LASSERT(req != NULL);
@@ -1190,44 +1309,24 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
         pill = &info->mti_pill;
         LASSERT(pill->rc_req == req);
 
-        if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
+        if (req->rq_reqmsg->bufcount > MDS_REQ_INTENT_IT_OFF) {
+                req_capsule_extend(pill, &RQF_LDLM_INTENT);
+                it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
+                if (it != NULL) {
+                        LDLM_DEBUG(lock, "intent policy opc: %s",
+                                   ldlm_it2str(it->opc));
+
+                        rc = mdt_intent_opc(it->opc, info);
+                        if (rc == 0)
+                                rc = ELDLM_OK;
+                } else
+                        rc = -EFAULT;
+        } else {
                 /* No intent was provided */
                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
-                RETURN(req_capsule_pack(pill));
+                rc = req_capsule_pack(pill);
         }
-
-        req_capsule_extend(pill, &RQF_LDLM_INTENT);
-        it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
-        if (it == NULL) {
-                rc = -EFAULT;
-                RETURN(rc);
-        }
-
-        LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
-
-        opcode = mdt_intent_code(it->opc);
-        if (opcode < 0)
-                RETURN(-EINVAL);
-
-        req_capsule_extend(pill, mdt_it_flavor[opcode].it_fmt);
-
-        if (mdt_it_flavor[opcode].it_flags & HABEO_CORPUS) {
-                rc = mdt_habeo_corpus(info, mdt_it_flavor[opcode].it_flags);
-                if (rc != 0)
-                        RETURN(rc);
-        }
-
-        rc = req_capsule_pack(pill);
-        if (rc != 0)
-                RETURN(rc);
-
-        rep = req_capsule_server_get(pill, &RMF_DLM_REP);
-        intent_set_disposition(rep, DISP_IT_EXECD);
-
-        /* execute policy */
-        rep->lock_policy_res2 = mdt_it_flavor[opcode].it_act(info, req, offset);
-
-        RETURN(ELDLM_OK);
+        RETURN(rc);
 }
 
 static int mdt_config(const struct lu_context *ctx, struct mdt_device *m,
@@ -1970,17 +2069,17 @@ static struct mdt_handler mdt_mds_ops[] = {
 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
-DEF_MDT_HNDL_F(HABEO_DISCUS|HABEO_REFERO, GETATTR,      mdt_getattr),
-DEF_MDT_HNDL_F(HABEO_DISCUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
-DEF_MDT_HNDL_0(HABEO_DISCUS,              SETXATTR,     mdt_setxattr),
-DEF_MDT_HNDL_0(HABEO_DISCUS,              GETXATTR,     mdt_getxattr),
+DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR,      mdt_getattr),
+DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
+DEF_MDT_HNDL_0(HABEO_CORPUS,              SETXATTR,     mdt_setxattr),
+DEF_MDT_HNDL_0(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
-DEF_MDT_HNDL_0(HABEO_DISCUS,              READPAGE,     mdt_readpage),
-DEF_MDT_HNDL_0(0,                         REINT,        mdt_reint),
-DEF_MDT_HNDL_0(HABEO_DISCUS,              CLOSE,        mdt_close),
-DEF_MDT_HNDL_0(HABEO_CORPUS,              DONE_WRITING, mdt_done_writing),
+DEF_MDT_HNDL_0(HABEO_CORPUS,              READPAGE,     mdt_readpage),
+DEF_MDT_HNDL_F(0,                         REINT,        mdt_reint),
+DEF_MDT_HNDL_0(HABEO_CORPUS,              CLOSE,        mdt_close),
+DEF_MDT_HNDL_0(0,                         DONE_WRITING, mdt_done_writing),
 DEF_MDT_HNDL_0(0,                         PIN,          mdt_pin),
-DEF_MDT_HNDL_0(HABEO_DISCUS,              SYNC,         mdt_sync),
+DEF_MDT_HNDL_0(HABEO_CORPUS,              SYNC,         mdt_sync),
 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_handle_quotacheck),
 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_handle_quotactl)
 };
index aff0ccc..19d2bd9 100644 (file)
@@ -210,10 +210,8 @@ void mdt_object_unlock(struct ldlm_namespace *, struct mdt_object *,
 struct mdt_object *mdt_object_find_lock(const struct lu_context *,
                                         struct mdt_device *, struct lu_fid *,
                                         struct mdt_lock_handle *, __u64);
-int mdt_reint_unpack(struct mdt_thread_info *,
-                     struct ptlrpc_request *,
-                     int);
 
+int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
 int mdt_reint_rec(struct mdt_thread_info *);
 
 #endif /* __KERNEL__ */
index bd296b7..078f217 100644 (file)
 
 
 /* unpacking */
-static int mdt_setattr_unpack(struct mdt_thread_info *info,
-                              struct ptlrpc_request *req, 
-                              int offset)
+static int mdt_setattr_unpack(struct mdt_thread_info *info)
 {
         ENTRY;
         RETURN(-EOPNOTSUPP);
 }
 
-static int mdt_create_unpack(struct mdt_thread_info *info,
-                             struct ptlrpc_request *req, 
-                             int offset)
+static int mdt_create_unpack(struct mdt_thread_info *info)
 {
         struct mdt_rec_create *rec;
         struct lu_attr *attr = &info->mti_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
+        struct req_capsule *pill = &info->mti_pill;
+        int result;
         ENTRY;
 
-        rec = lustre_swab_reqbuf(req, offset, sizeof (*rec),
-                                 lustre_swab_mdt_rec_create);
-        if (rec == NULL)
-                RETURN(-EFAULT);
-
-        rr->rr_fid1 = &rec->cr_fid1;
-        rr->rr_fid2 = &rec->cr_fid2;
-        attr->la_mode = rec->cr_mode;
-
-        rr->rr_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
-
-        RETURN(0);
+        rec = req_capsule_client_get(pill, &RMF_REC_CREATE);
+        if (rec != NULL) {
+                rr->rr_fid1 = &rec->cr_fid1;
+                rr->rr_fid2 = &rec->cr_fid2;
+                attr->la_mode = rec->cr_mode;
+
+                rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+                result = 0;
+        } else
+                result = -EFAULT;
+        RETURN(result);
 }
 
-static int mdt_link_unpack(struct mdt_thread_info *info,
-                           struct ptlrpc_request *req, 
-                           int offset)
+static int mdt_link_unpack(struct mdt_thread_info *info)
 {
         ENTRY;
         RETURN(-EOPNOTSUPP);
 }
 
-static int mdt_unlink_unpack(struct mdt_thread_info *info,
-                             struct ptlrpc_request *req, 
-                             int offset)
+static int mdt_unlink_unpack(struct mdt_thread_info *info)
 {
         ENTRY;
         RETURN(-EOPNOTSUPP);
 }
 
-static int mdt_rename_unpack(struct mdt_thread_info *info,
-                             struct ptlrpc_request *req, 
-                             int offset)
+static int mdt_rename_unpack(struct mdt_thread_info *info)
 {
         ENTRY;
         RETURN(-EOPNOTSUPP);
 }
 
-static int mdt_open_unpack(struct mdt_thread_info *info,
-                           struct ptlrpc_request *req, 
-                           int offset)
+static int mdt_open_unpack(struct mdt_thread_info *info)
 {
-        struct mdt_rec_create *rec;
-        struct lu_attr *attr = &info->mti_attr;
-        struct mdt_reint_record *rr = &info->mti_rr;
+        struct mdt_rec_create   *rec;
+        struct lu_attr          *attr = &info->mti_attr;
+        struct req_capsule      *pill = &info->mti_pill;
+        struct mdt_reint_record *rr   = &info->mti_rr;
+        int result;
         ENTRY;
 
-        rec = lustre_swab_reqbuf(req, offset, sizeof (*rec),
-                                 lustre_swab_mdt_rec_create);
-        if (rec == NULL)
-                RETURN(-EFAULT);
-
-        rr->rr_fid1   = &rec->cr_fid1;
-        rr->rr_fid2   = &rec->cr_fid2;
-        attr->la_mode = rec->cr_mode;
-        rr->rr_flags  = rec->cr_flags;
-
-        rr->rr_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
-        if (rr->rr_name == NULL)
-                RETURN (-EFAULT);
-
-        RETURN(0);
+        rec = req_capsule_client_get(pill, &RMF_REC_CREATE);
+        if (rec != NULL) {
+                rr->rr_fid1   = &rec->cr_fid1;
+                rr->rr_fid2   = &rec->cr_fid2;
+                attr->la_mode = rec->cr_mode;
+                rr->rr_flags  = rec->cr_flags;
+
+                rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+                if (rr->rr_name == NULL)
+                        result = -EFAULT;
+                else
+                        result = 0;
+        } else
+                result = -EFAULT;
+
+        RETURN(result);
 }
 
-typedef int (*reint_unpacker)(struct mdt_thread_info *info,
-                              struct ptlrpc_request *req, 
-                              int offset);
+typedef int (*reint_unpacker)(struct mdt_thread_info *info);
 
 static reint_unpacker mdt_reint_unpackers[REINT_MAX] = {
         [REINT_SETATTR]  = mdt_setattr_unpack,
-        [REINT_CREATE] = mdt_create_unpack,
-        [REINT_LINK] = mdt_link_unpack,
-        [REINT_UNLINK] = mdt_unlink_unpack,
-        [REINT_RENAME] = mdt_rename_unpack,
-        [REINT_OPEN] = mdt_open_unpack
+        [REINT_CREATE]   = mdt_create_unpack,
+        [REINT_LINK]     = mdt_link_unpack,
+        [REINT_UNLINK]   = mdt_unlink_unpack,
+        [REINT_RENAME]   = mdt_rename_unpack,
+        [REINT_OPEN]     = mdt_open_unpack
 };
 
-int mdt_reint_unpack(struct mdt_thread_info *info,
-                     struct ptlrpc_request *req, 
-                     int offset)
+int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op)
 {
-        mdt_reint_t opcode;
-        mdt_reint_t *opcodep;
         int rc;
-        ENTRY;
 
-        /* NB don't lustre_swab_reqbuf() here.  We're just taking a peek
-         * and we want to leave it to the specific unpacker once we've
-         * identified the message type */
-        opcodep = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*opcodep));
-        if (opcodep == NULL)
-                RETURN(-EFAULT);
-
-        opcode = *opcodep;
-        if (lustre_msg_swabbed(req->rq_reqmsg))
-                __swab32s (&opcode);
+        ENTRY;
 
-        if (opcode >= REINT_MAX || mdt_reint_unpackers[opcode] == NULL) {
-                CERROR("Unexpected opcode %d\n", opcode);
-                RETURN(-EFAULT);
+        if (op < REINT_MAX && mdt_reint_unpackers[op] != NULL) {
+                info->mti_rr.rr_opcode = op;
+                rc = mdt_reint_unpackers[op](info);
+        } else {
+                CERROR("Unexpected opcode %d\n", op);
+                rc = -EFAULT;
         }
-        info->mti_rr.rr_opcode = opcode;
-        rc = mdt_reint_unpackers[opcode](info, req, offset);
-
         RETURN(rc);
 }
index 06ee214..3bc3c3b 100644 (file)
@@ -64,11 +64,26 @@ static const struct req_msg_field *mds_getattr_name_client[] = {
         &RMF_NAME
 };
 
+static const struct req_msg_field *mds_reint_client[] = {
+        &RMF_REINT_OPC
+};
+
 static const struct req_msg_field *mds_reint_create_client[] = {
         &RMF_REC_CREATE,
         &RMF_NAME
 };
 
+static const struct req_msg_field *mds_reint_unlink_client[] = {
+        &RMF_REC_UNLINK,
+        &RMF_NAME
+};
+
+static const struct req_msg_field *mds_reint_setattr_client[] = {
+        &RMF_REC_SETATTR,
+        &RMF_EADATA,
+        &RMF_LOGCOOKIES
+};
+
 static const struct req_msg_field *mds_connect_client[] = {
         &RMF_TGTUUID,
         &RMF_CLUUID,
@@ -90,7 +105,8 @@ static const struct req_msg_field *ldlm_enqueue_server[] = {
 
 static const struct req_msg_field *ldlm_intent_client[] = {
         &RMF_DLM_REQ,
-        &RMF_LDLM_INTENT
+        &RMF_LDLM_INTENT,
+        &RMF_REINT_OPC
 };
 
 static const struct req_msg_field *ldlm_intent_server[] = {
@@ -106,6 +122,23 @@ static const struct req_msg_field *ldlm_intent_getattr_client[] = {
         &RMF_NAME
 };
 
+/*
+ * Used for open and create.
+ */
+static const struct req_msg_field *ldlm_intent_create_client[] = {
+        &RMF_DLM_REQ,
+        &RMF_LDLM_INTENT,
+        &RMF_REC_CREATE,    /* coincides with mds_reint_create_client[] */
+        &RMF_NAME
+};
+
+static const struct req_msg_field *ldlm_intent_unlink_client[] = {
+        &RMF_DLM_REQ,
+        &RMF_LDLM_INTENT,
+        &RMF_REC_UNLINK,    /* coincides with mds_reint_unlink_client[] */
+        &RMF_NAME
+};
+
 static const struct req_format *req_formats[] = {
         &RQF_MDS_CONNECT,
         &RQF_MDS_DISCONNECT,
@@ -113,10 +146,16 @@ static const struct req_format *req_formats[] = {
         &RQF_MDS_STATFS,
         &RQF_MDS_GETATTR,
         &RQF_MDS_GETATTR_NAME,
+        &RQF_MDS_REINT,
         &RQF_MDS_REINT_CREATE,
+        &RQF_MDS_REINT_UNLINK,
+        &RQF_MDS_REINT_SETATTR,
         &RQF_LDLM_ENQUEUE,
         &RQF_LDLM_INTENT,
-        &RQF_LDLM_INTENT_GETATTR
+        &RQF_LDLM_INTENT_GETATTR,
+        &RQF_LDLM_INTENT_OPEN,
+        &RQF_LDLM_INTENT_CREATE,
+        &RQF_LDLM_INTENT_UNLINK,
 };
 
 struct req_msg_field {
@@ -157,11 +196,6 @@ const struct req_msg_field RMF_NAME =
         DEFINE_MSGF("name", RMF_F_STRING, 0, NULL);
 EXPORT_SYMBOL(RMF_NAME);
 
-const struct req_msg_field RMF_REC_CREATE =
-        DEFINE_MSGF("rec_create", 0,
-                    sizeof(struct mdt_rec_create), lustre_swab_mdt_rec_create);
-EXPORT_SYMBOL(RMF_REC_CREATE);
-
 const struct req_msg_field RMF_TGTUUID =
         DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
 EXPORT_SYMBOL(RMF_TGTUUID);
@@ -204,6 +238,42 @@ const struct req_msg_field RMF_MDT_MD =
                     0, sizeof(struct lov_mds_md) /* FIXME: See mds */, NULL);
 EXPORT_SYMBOL(RMF_MDT_MD);
 
+const struct req_msg_field RMF_REC_UNLINK =
+        DEFINE_MSGF("rec_unlink", 0, sizeof(struct mdt_rec_unlink),
+                    lustre_swab_mds_rec_unlink);
+EXPORT_SYMBOL(RMF_REC_UNLINK);
+
+const struct req_msg_field RMF_REC_LINK =
+        DEFINE_MSGF("rec_link", 0, sizeof(struct mdt_rec_link),
+                    lustre_swab_mdt_rec_link);
+EXPORT_SYMBOL(RMF_REC_LINK);
+
+const struct req_msg_field RMF_REC_RENAME =
+        DEFINE_MSGF("rec_rename", 0, sizeof(struct mdt_rec_rename),
+                    lustre_swab_mdt_rec_rename);
+EXPORT_SYMBOL(RMF_REC_RENAME);
+
+const struct req_msg_field RMF_REC_CREATE =
+        DEFINE_MSGF("rec_create", 0,
+                    sizeof(struct mdt_rec_create), lustre_swab_mdt_rec_create);
+EXPORT_SYMBOL(RMF_REC_CREATE);
+
+const struct req_msg_field RMF_REC_SETATTR =
+        DEFINE_MSGF("rec_setattr", 0, sizeof(struct mdt_rec_setattr),
+                    lustre_swab_mdt_rec_setattr);
+EXPORT_SYMBOL(RMF_REC_SETATTR);
+
+const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, 0, NULL);
+EXPORT_SYMBOL(RMF_EADATA);
+
+const struct req_msg_field RMF_LOGCOOKIES =
+        DEFINE_MSGF("logcookies", 0, 0, NULL);
+EXPORT_SYMBOL(RMF_LOGCOOKIES);
+
+const struct req_msg_field RMF_REINT_OPC =
+        DEFINE_MSGF("reint_opc", 0, sizeof(__u32), lustre_swab_generic_32s);
+EXPORT_SYMBOL(RMF_REINT_OPC);
+
 /*
  * Request formats.
  */
@@ -251,11 +321,25 @@ const struct req_format RQF_MDS_GETATTR_NAME =
                         mds_getattr_name_client, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_GETATTR_NAME);
 
+const struct req_format RQF_MDS_REINT =
+        DEFINE_REQ_FMT0("MDS_REINT", mds_reint_client, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_REINT);
+
 const struct req_format RQF_MDS_REINT_CREATE =
         DEFINE_REQ_FMT0("MDS_REINT_CREATE",
                         mds_reint_create_client, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_REINT_CREATE);
 
+const struct req_format RQF_MDS_REINT_UNLINK =
+        DEFINE_REQ_FMT0("MDS_REINT_UNLINK",
+                        mds_reint_unlink_client, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_REINT_UNLINK);
+
+const struct req_format RQF_MDS_REINT_SETATTR =
+        DEFINE_REQ_FMT0("MDS_REINT_SETATTR",
+                        mds_reint_setattr_client, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_REINT_SETATTR);
+
 const struct req_format RQF_MDS_CONNECT =
         DEFINE_REQ_FMT0("MDS_CONNECT",
                         mds_connect_client, mds_connect_server);
@@ -276,10 +360,25 @@ const struct req_format RQF_LDLM_INTENT =
 EXPORT_SYMBOL(RQF_LDLM_INTENT);
 
 const struct req_format RQF_LDLM_INTENT_GETATTR =
-        DEFINE_REQ_FMT0("LDLM_INTENT",
+        DEFINE_REQ_FMT0("LDLM_INTENT_GETATTR",
                         ldlm_intent_getattr_client, ldlm_intent_server);
 EXPORT_SYMBOL(RQF_LDLM_INTENT_GETATTR);
 
+const struct req_format RQF_LDLM_INTENT_OPEN =
+        DEFINE_REQ_FMT0("LDLM_INTENT_OPEN",
+                        ldlm_intent_create_client, ldlm_intent_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT_OPEN);
+
+const struct req_format RQF_LDLM_INTENT_CREATE =
+        DEFINE_REQ_FMT0("LDLM_INTENT_CREATE",
+                        ldlm_intent_create_client, ldlm_intent_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT_CREATE);
+
+const struct req_format RQF_LDLM_INTENT_UNLINK =
+        DEFINE_REQ_FMT0("LDLM_INTENT_UNLINK",
+                        ldlm_intent_unlink_client, ldlm_intent_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT_UNLINK);
+
 int req_layout_init(void)
 {
         int i;
@@ -339,6 +438,15 @@ static int __req_format_is_sane(const struct req_format *fmt)
                 req_formats[fmt->rf_idx] == fmt;
 }
 
+static struct lustre_msg *__req_msg(const struct req_capsule *pill,
+                                    enum req_location loc)
+{
+        struct ptlrpc_request *req;
+
+        req = pill->rc_req;
+        return loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg;
+}
+
 void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt)
 {
         LASSERT(pill->rc_fmt == NULL);
@@ -399,11 +507,10 @@ static void *__req_capsule_get(const struct req_capsule *pill,
                                const struct req_msg_field *field,
                                enum req_location loc)
 {
-        const struct req_format     *fmt;
-        const struct ptlrpc_request *req;
-        struct lustre_msg           *msg;
-        void                        *value;
-        int                          offset;
+        const struct req_format *fmt;
+        struct lustre_msg       *msg;
+        void                    *value;
+        int                      offset;
 
         void *(*getter)(struct lustre_msg *m, int n, int minlen);
 
@@ -418,8 +525,7 @@ static void *__req_capsule_get(const struct req_capsule *pill,
 
         offset = __req_capsule_offset(pill, field, loc);
 
-        req = pill->rc_req;
-        msg = loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg;
+        msg = __req_msg(pill, loc);
 
         getter = (field->rmf_flags & RMF_F_STRING) ?
                 (typeof(getter))lustre_msg_string : lustre_msg_buf;
@@ -432,9 +538,9 @@ static void *__req_capsule_get(const struct req_capsule *pill,
                 field->rmf_swabber(value);
         if (value == NULL)
                 DEBUG_REQ(D_ERROR, pill->rc_req,
-                          "Wrong buffer for field `%s' (%d) in format `%s': "
-                          "%d < %d (%s)\n",
-                          field->rmf_name, offset, fmt->rf_name,
+                          "Wrong buffer for field `%s' (%d of %d) "
+                          "in format `%s': %d vs. %d (%s)\n",
+                          field->rmf_name, offset, msg->bufcount, fmt->rf_name,
                           lustre_msg_buflen(msg, offset), field->rmf_size,
                           rcl_names[loc]);
         return value;
@@ -469,6 +575,8 @@ void req_capsule_set_size(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_set_size);
 
+#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
+
 void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
 {
         int i;
@@ -486,15 +594,35 @@ void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
         for (i = 0; i < RCL_NR; ++i) {
                 LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr);
                 for (j = 0; j < old->rf_fields[i].nr - 1; ++j) {
-                        LASSERT(fmt->rf_fields[i].d[j]->rmf_size ==
-                                old->rf_fields[i].d[j]->rmf_size);
+                        LASSERT(FMT_FIELD(fmt, i, j) == FMT_FIELD(old, i, j));
                 }
                 /*
                  * Last field in old format can be shorter than in new.
                  */
-                LASSERT(fmt->rf_fields[i].d[j]->rmf_size >=
-                        old->rf_fields[i].d[j]->rmf_size);
+                LASSERT(FMT_FIELD(fmt, i, j)->rmf_size >=
+                        FMT_FIELD(old, i, j)->rmf_size);
         }
         pill->rc_fmt = fmt;
 }
 EXPORT_SYMBOL(req_capsule_extend);
+
+int req_capsule_has_field(const struct req_capsule *pill,
+                          const struct req_msg_field *field)
+{
+        int result;
+        int loc;
+
+        loc = pill->rc_loc ^ 1;
+        if (field->rmf_offset[pill->rc_fmt->rf_idx][loc]) {
+                int offset;
+
+                offset = __req_capsule_offset(pill, field, loc);
+                result = __req_msg(pill, loc)->bufcount > offset;
+        } else
+                /*
+                 * Field doesn't exist in this format.
+                 */
+                result = 0;
+        return result;
+}
+EXPORT_SYMBOL(req_capsule_has_field);