Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Fri, 15 Sep 2006 10:28:03 +0000 (10:28 +0000)
committerwangdi <wangdi>
Fri, 15 Sep 2006 10:28:03 +0000 (10:28 +0000)
some fixes about splitting dir

13 files changed:
lustre/cmm/cmm_split.c
lustre/cmm/mdc_object.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_req_layout.h
lustre/include/md_object.h
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_lib.c
lustre/mdd/mdd_handler.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lib.c
lustre/ptlrpc/layout.c

index ce0bffc..34f2a5c 100644 (file)
@@ -69,7 +69,6 @@ static int cmm_expect_splitting(const struct lu_context *ctx,
 
         if (ma->ma_lmv_size)
                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
-
         OBD_ALLOC_PTR(fid);
         rc = cmm_root_get(ctx, &cmm->cmm_md_dev, fid);
         if (rc)
@@ -150,7 +149,9 @@ static inline void cmm_object_put(const struct lu_context *ctxt,
 
 static int cmm_creat_remote_obj(const struct lu_context *ctx,
                                 struct cmm_device *cmm,
-                                struct lu_fid *fid, struct md_attr *ma)
+                                struct lu_fid *fid, struct md_attr *ma,
+                                const struct lmv_stripe_md *lmv,
+                                int lmv_size)
 {
         struct cmm_object *obj;
         struct md_create_spec *spec;
@@ -162,7 +163,11 @@ static int cmm_creat_remote_obj(const struct lu_context *ctx,
                 RETURN(PTR_ERR(obj));
 
         OBD_ALLOC_PTR(spec);
-        spec->u.sp_pfid = fid;
+
+        spec->u.sp_ea.fid = fid;
+        spec->u.sp_ea.eadata = lmv;
+        spec->u.sp_ea.eadatalen = lmv_size;
+        spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
         rc = mo_object_create(ctx, md_object_next(&obj->cmo_obj),
                               spec, ma);
         OBD_FREE_PTR(spec);
@@ -175,7 +180,7 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
                                     struct md_object *mo, struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
-        struct lmv_stripe_md *lmv = NULL;
+        struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
         int lmv_size, i, rc;
         struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
         ENTRY;
@@ -197,8 +202,16 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
         if (rc)
                 GOTO(cleanup, rc);
 
+        OBD_ALLOC_PTR(slave_lmv);
+        if (!slave_lmv)
+                GOTO(cleanup, rc = -ENOMEM);
+
+        slave_lmv->mea_master = cmm->cmm_local_num;
+        slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
+        slave_lmv->mea_count = 0;
         for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
-                rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
+                rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma, 
+                                          slave_lmv, sizeof(slave_lmv));
                 if (rc)
                         GOTO(cleanup, rc);
         }
@@ -206,6 +219,8 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
         ma->ma_lmv_size = lmv_size;
         ma->ma_lmv = lmv;
 cleanup:
+        if (slave_lmv)
+                OBD_FREE_PTR(slave_lmv);
         RETURN(rc);
 }
 
@@ -241,6 +256,8 @@ static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
 
         /* Read splitted page and send them to the slave master */
         do {
+                struct lu_dirpage *ldp;
+
                 /* init page with '0' */
                 for (i = 0; i < rdpg->rp_npages; i++) {
                         memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
@@ -256,7 +273,15 @@ static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
                         RETURN(rc);
 
                 rc = cmm_send_split_pages(ctx, mo, rdpg, lf, end);
-
+                if (rc) 
+                        RETURN(rc);
+                
+                kmap(rdpg->rp_pages[0]);
+                ldp = page_address(rdpg->rp_pages[0]);
+                if (ldp->ldp_hash_end == ~0ul)
+                        rc = -E2BIG;
+                rdpg->rp_hash = ldp->ldp_hash_end;
+                kunmap(rdpg->rp_pages[0]); 
         } while (rc == 0);
 
         /* it means already finish splitting this segment */
@@ -324,7 +349,6 @@ static int cmm_scan_and_split(const struct lu_context *ctx,
 
                 rdpg->rp_hash = i * hash_segement;
                 hash_end = rdpg->rp_hash + hash_segement;
-
                 rc = cmm_split_entries(ctx, mo, rdpg, lf, hash_end);
                 if (rc)
                         GOTO(cleanup, rc);
index 10bf0e9..2d0170a 100644 (file)
@@ -167,10 +167,11 @@ static int mdc_object_create(const struct lu_context *ctx,
         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
         struct lu_attr *la = &ma->ma_attr;
         struct mdc_thread_info *mci;
-        const char *symname;
+        const void *symname;
         int rc, symlen;
         ENTRY;
 
+        LASSERT(spec->u.sp_pfid != NULL);
         mci = mdc_info_init(ctx);
         mci->mci_opdata.fid2 = *lu_object_fid(&mo->mo_lu);
         /* parent fid is needed to create dotdot on the remote node */
@@ -178,8 +179,15 @@ static int mdc_object_create(const struct lu_context *ctx,
         mci->mci_opdata.mod_time = la->la_mtime;
 
         /* get data from spec */
-        symname = spec->u.sp_symname;
-        symlen = symname ? strlen(symname) + 1 : 0;
+        if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
+                symname = spec->u.sp_ea.eadata;
+                symlen = spec->u.sp_ea.eadatalen;
+                mci->mci_opdata.fid1 = *(spec->u.sp_ea.fid);
+                mci->mci_opdata.flags |= MDS_CREATE_SLAVE_OBJ;
+        } else {
+                symname = spec->u.sp_symname;
+                symlen = symname ? strlen(symname) + 1 : 0;
+        }
         
         rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
                        symname, symlen,
index 2662c06..5d916ad 100644 (file)
@@ -299,7 +299,7 @@ static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
 #define MEA_MAGIC_LAST_CHAR      0xb2221ca1
 #define MEA_MAGIC_ALL_CHARS      0xb222a11c
 #define MEA_MAGIC_HASH_SEGMENT   0xb222a11b
-#define MAX_HASH_SIZE            0x3fffffff
+#define MAX_HASH_SIZE            0x7fffffff
 
 struct lmv_stripe_md {
         __u32         mea_magic;
@@ -1084,6 +1084,9 @@ extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa);
 #define MDS_OPEN_DELAY_CREATE  0100000000 /* delay initial object create */
 #define MDS_OPEN_OWNEROVERRIDE 0200000000 /* NFSD rw-reopen ro file for owner */
 #define MDS_OPEN_JOIN_FILE     0400000000 /* open for join file*/
+#define MDS_CREATE_SLAVE_OBJ  02000000000 /* indicate create slave object
+                                           * actually, this is for create, not
+                                           * conflict with other open flags */
 #define MDS_OPEN_LOCK         04000000000 /* This open requires open lock */
 #define MDS_OPEN_HAS_EA      010000000000 /* specify object create pattern */
 #define MDS_OPEN_HAS_OBJS    020000000000 /* Just set the EA the obj exist */
index 46348ad..c61f22e 100644 (file)
@@ -118,6 +118,8 @@ extern const struct req_format RQF_MDS_DONE_WRITING;
 extern const struct req_format RQF_MDS_GETATTR_NAME;
 extern const struct req_format RQF_MDS_REINT;
 extern const struct req_format RQF_MDS_REINT_CREATE;
+extern const struct req_format RQF_MDS_REINT_CREATE_SLAVE;
+extern const struct req_format RQF_MDS_REINT_CREATE_SYM;
 extern const struct req_format RQF_MDS_REINT_OPEN;
 extern const struct req_format RQF_MDS_REINT_UNLINK;
 extern const struct req_format RQF_MDS_REINT_LINK;
index 2ec5626..f9be3dd 100644 (file)
@@ -78,6 +78,7 @@ struct md_create_spec {
                 /* eadata for regular files */
                 struct md_spec_reg {
                         /* lov objs exist already */
+                        const struct lu_fid   *fid;
                         int no_lov_create;
                         const void *eadata;
                         int  eadatalen;
index d0bff37..c1062bc 100644 (file)
@@ -136,6 +136,44 @@ out:
         return rc;
 }
 
+int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
+                            struct md_op_data *op, struct lu_fid *fid)
+{
+        struct lmv_obj *obj;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        struct lu_fid *rpid;
+        mdsno_t mds;
+        int rc;
+        ENTRY;
+
+        obj = lmv_obj_grab(obd, pid);
+        if (!obj)
+               RETURN(0);
+        mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
+                           (char *)op->name, op->namelen);
+        rpid = &obj->lo_inodes[mds].li_fid;
+        rc = lmv_fld_lookup(lmv, rpid, &mds);
+        if (rc)
+                GOTO(cleanup, rc);
+        
+        rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, NULL);
+        if (rc < 0)
+                GOTO(cleanup, rc);
+        if (rc > 0) {
+                LASSERT(fid_is_sane(fid));
+                rc = fld_client_create(&lmv->lmv_fld,
+                                       fid_seq(fid), mds, NULL);
+                if (rc) {
+                        CERROR("can't create fld rc%d\n", rc);
+                        GOTO(cleanup, rc);
+                }
+        }
+        CDEBUG(D_INFO, "Allocate new fid"DFID"for split obj\n",PFID(fid));
+cleanup:
+        lmv_obj_put(obj);
+        RETURN(rc);
+}
+
 /*
  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
  * may be split dir.
@@ -169,6 +207,7 @@ repeat:
         rc = lmv_fld_lookup(lmv, &rpid, &mds);
         if (rc)
                 GOTO(out_free_sop_data, rc);
+        
         obj = lmv_obj_grab(obd, &rpid);
         if (obj) {
                 /*
@@ -193,6 +232,7 @@ repeat:
         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data,
                             lmm, lmmsize, it, flags, reqp,
                             cb_blocking, extra_lock_flags);
+        
         if (rc == -ERESTART) {
                 /*
                  * Directory got split. Time to update local object and repeat
@@ -202,6 +242,12 @@ repeat:
                 rc = lmv_handle_split(exp, &rpid);
                 if (rc == 0) {
                         ptlrpc_req_finished(*reqp);
+                       /* We shoudld reallocate the FID for the object */
+                        rc = lmv_alloc_fid_for_split(obd, &rpid, op_data,
+                                                     &sop_data->fid2);
+                        if (rc)
+                                GOTO(out_free_sop_data, rc);
+                        /* client switches to new sequence, setup fld */
                         goto repeat;
                 }
         }
index 6c4aa16..af1454e 100644 (file)
@@ -138,6 +138,8 @@ int lmv_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                     void *, int);
 int lmv_fld_lookup(struct lmv_obd *lmv, const struct lu_fid *fid,
                    mdsno_t *mds);
+int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
+                            struct md_op_data *op, struct lu_fid *fid);
 
 static inline struct lmv_stripe_md * 
 lmv_get_mea(struct ptlrpc_request *req, int offset)
index 4e3763e..3bec9fc 100644 (file)
@@ -1276,11 +1276,10 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
 
         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
 
-        EXIT;
 cleanup:
         if (req)
                 ptlrpc_req_finished(req);
-        return rc;
+        RETURN(rc);
 }
 
 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
@@ -1334,6 +1333,10 @@ repeat:
                 rc = lmv_handle_split(exp, &op_data->fid1);
                 if (rc == 0) {
                         ptlrpc_req_finished(*request);
+                        rc = lmv_alloc_fid_for_split(obd, &op_data->fid1,
+                                                op_data, &op_data->fid2);
+                        if (rc)
+                                RETURN(rc);
                         goto repeat;
                 }
         }
@@ -1915,14 +1918,15 @@ static int lmv_readpage(struct obd_export *exp,
 
         obj = lmv_obj_grab(obd, fid);
         if (obj) {
+                __u64 index = offset;
+                __u32 seg = MAX_HASH_SIZE;
                 lmv_obj_lock(obj);
-
-                /* find dirobj containing page with requested offset. */
-                for (i = 0; i < obj->lo_objcount; i++) {
-                        if (offset < obj->lo_inodes[i].li_size)
-                                break;
-                        offset -= obj->lo_inodes[i].li_size;
-                }
+                
+                LASSERT(obj->lo_objcount > 0);
+                do_div(seg, obj->lo_objcount);
+                do_div(index, seg);
+                offset -= index * seg;  
+                i = (int)index;
                 rid = obj->lo_inodes[i].li_fid;
 
                 lmv_obj_unlock(obj);
@@ -1941,7 +1945,7 @@ static int lmv_readpage(struct obd_export *exp,
 #ifdef __KERNEL__
         if (obj && i < obj->lo_objcount - 1) {
                 struct lu_dirpage *dp;
-                __u32 end;
+                __u32 end, max_hash = MAX_HASH_SIZE;
                 /*
                  * This dirobj has been split, so we check whether reach the end
                  * of one hash_segment and reset ldp->ldp_hash_end.
@@ -1950,14 +1954,18 @@ static int lmv_readpage(struct obd_export *exp,
                 dp = page_address(page); 
                 end = le32_to_cpu(dp->ldp_hash_end);
                 if (end == ~0ul) {
-                        __u32 hash_segment_end = (i + 1) * 
-                                MAX_HASH_SIZE/obj->lo_objcount;
-                        dp->ldp_hash_end = cpu_to_le32(hash_segment_end); 
-                        CDEBUG(D_INFO,"reset hash end %x for split obj "DFID"",
-                               le32_to_cpu(dp->ldp_hash_end), PFID(&rid)); 
+                        __u32 seg_end;
+                        
+                        do_div(max_hash, obj->lo_objcount);
+                        seg_end = max_hash * (i + 1);
+                        
+                        dp->ldp_hash_end = cpu_to_le32(seg_end); 
+                        CDEBUG(D_INFO,"reset hash end %x for split obj "DFID" "
+                                       "obj count %d \n",
+                               le32_to_cpu(dp->ldp_hash_end), PFID(&rid),
+                               obj->lo_objcount); 
                 }
                 kunmap(page);
-                
         }
 #endif
         /*
index 336abee..c14e09f 100644 (file)
@@ -95,7 +95,8 @@ void mdc_create_pack(struct ptlrpc_request *req, int offset,
         rec->cr_rdev = rdev;
         rec->cr_time = op_data->mod_time;
         rec->cr_suppgid = op_data->suppgids[0];
-
+        rec->cr_flags = op_data->flags;
+        
         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, op_data->namelen + 1);
         LOGL0(op_data->name, op_data->namelen, tmp);
 
index 25b9392..539d4a6 100644 (file)
@@ -733,18 +733,6 @@ static int __mdd_xattr_set(const struct lu_context *ctxt, struct mdd_object *o,
         if (buf && buf_len > 0) {
                 rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
                                                 0, handle);
-#ifdef HAVE_SPLIT_SUPPORT
-                if (rc == 0) {
-                        /* very ugly hack, if setting lmv, it means splitting 
-                         * sucess, we should return -ERESTART to notify the 
-                         * client, so transno for this splitting should be
-                         * zero according to the replay rules. so return -ERESTART
-                         * here let mdt trans stop callback know this. 
-                         */
-                        if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0) 
-                                rc = -ERESTART;
-                }
-#endif
         }else if (buf == NULL && buf_len == 0) {
                 rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
         }
@@ -971,7 +959,18 @@ static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
 
         rc = mdd_xattr_set_txn(ctxt, md2mdd_obj(obj), buf, buf_len, name,
                                fl, handle);
-
+#ifdef HAVE_SPLIT_SUPPORT
+        if (rc == 0) {
+                /* very ugly hack, if setting lmv, it means splitting 
+                 * sucess, we should return -ERESTART to notify the 
+                 * client, so transno for this splitting should be
+                 * zero according to the replay rules. so return -ERESTART
+                 * here let mdt trans stop callback know this. 
+                 */
+                 if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0) 
+                        rc = -ERESTART;
+        }
+#endif
         mdd_trans_stop(ctxt, mdd, rc, handle);
 
         RETURN(rc);
@@ -1890,6 +1889,7 @@ static int mdd_object_create(const struct lu_context *ctxt,
         struct mdd_device *mdd = mdo2mdd(obj);
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct thandle *handle;
+        const struct lu_fid *pfid = spec->u.sp_pfid;
         int rc;
         ENTRY;
 
@@ -1900,14 +1900,22 @@ static int mdd_object_create(const struct lu_context *ctxt,
 
         mdd_write_lock(ctxt, mdd_obj);
         rc = __mdd_object_create(ctxt, mdd_obj, ma, handle);
+        if (rc == 0 && spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
+                /* if creating the slave object, set slave EA here */
+                rc = __mdd_xattr_set(ctxt, mdd_obj, spec->u.sp_ea.eadata,
+                                     spec->u.sp_ea.eadatalen, MDS_LMV_MD_NAME,
+                                     0, handle);
+                pfid = spec->u.sp_ea.fid;
+                CWARN("set slave ea "DFID" eadatalen %d rc %d \n",
+                       PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
+        }
+
         if (rc == 0)
-                rc = __mdd_object_initialize(ctxt, spec->u.sp_pfid, mdd_obj,
-                                     ma, handle);
+                rc = __mdd_object_initialize(ctxt, pfid, mdd_obj, ma, handle);
         mdd_write_unlock(ctxt, mdd_obj);
 
         if (rc == 0)
                 rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
-
         mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
index e240ad8..cadc02e 100644 (file)
@@ -599,11 +599,10 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
                           ent = lu_dirent_next(ent)) {
                 struct lu_fid *lf = &ent->lde_fid;
 
-                /* FIXME: check isdir */
+                /* FIXME: multi-trans for this name insert */
                 rc = mdo_name_insert(info->mti_ctxt,
                                      md_object_next(&object->mot_obj),
                                      ent->lde_name, lf, 0);
-                CDEBUG(D_INFO, "insert name %s rc %d \n", ent->lde_name, rc);
                 if (rc) {
                         kunmap(page);
                         RETURN(rc);
index 5bd3d56..b6e4080 100644 (file)
@@ -233,10 +233,26 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                 
                 rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
                 if (S_ISDIR(attr->la_mode)) {
+                        struct md_create_spec *sp = &info->mti_spec;
                         /* pass parent fid for cross-ref cases */
-                        info->mti_spec.u.sp_pfid = rr->rr_fid1;
+                        sp->u.sp_pfid = rr->rr_fid1;
+                        if (info->mti_spec.sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
+                                /* create salve object req, need 
+                                 * unpack split ea here 
+                                 */
+                               req_capsule_extend(pill, 
+                                                  &RQF_MDS_REINT_CREATE_SLAVE);
+                               LASSERT(req_capsule_field_present(pill, 
+                                                      &RMF_EADATA, RCL_CLIENT));
+                               sp->u.sp_ea.eadata = req_capsule_client_get(pill,
+                                                            &RMF_EADATA);
+                               sp->u.sp_ea.eadatalen =req_capsule_get_size(pill,
+                                                       &RMF_EADATA, RCL_CLIENT);
+                               sp->u.sp_ea.fid = rr->rr_fid1;
+                        }
                 } else if (S_ISLNK(attr->la_mode)) {
                         const char *tgt = NULL;
+                        req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SYM);
                         if (req_capsule_field_present(pill, &RMF_SYMTGT, 
                                                       RCL_CLIENT)) {
                                 tgt = req_capsule_client_get(pill,
index 76c9ecc..e4a1e02 100644 (file)
@@ -115,9 +115,22 @@ static const struct req_msg_field *mds_reint_create_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_REC_CREATE,
         &RMF_NAME,
+};
+
+static const struct req_msg_field *mds_reint_create_sym_client[] = {
+        &RMF_PTLRPC_BODY,
+        &RMF_REC_CREATE,
+        &RMF_NAME,
         &RMF_SYMTGT
 };
 
+static const struct req_msg_field *mds_reint_create_slave_client[] = {
+        &RMF_PTLRPC_BODY,
+        &RMF_REC_CREATE,
+        &RMF_NAME,
+        &RMF_EADATA
+};
+
 static const struct req_msg_field *mds_reint_open_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_REC_CREATE,
@@ -272,6 +285,8 @@ static const struct req_format *req_formats[] = {
         &RQF_MDS_GETATTR_NAME,
         &RQF_MDS_REINT,
         &RQF_MDS_REINT_CREATE,
+        &RQF_MDS_REINT_CREATE_SYM,
+        &RQF_MDS_REINT_CREATE_SLAVE,
         &RQF_MDS_REINT_OPEN,
         &RQF_MDS_REINT_UNLINK,
         &RQF_MDS_REINT_LINK,
@@ -529,6 +544,16 @@ const struct req_format RQF_MDS_REINT_CREATE =
                         mds_reint_create_client, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_REINT_CREATE);
 
+const struct req_format RQF_MDS_REINT_CREATE_SLAVE =
+        DEFINE_REQ_FMT0("MDS_REINT_CREATE_SLAVE",
+                        mds_reint_create_slave_client, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_REINT_CREATE_SLAVE);
+
+const struct req_format RQF_MDS_REINT_CREATE_SYM =
+        DEFINE_REQ_FMT0("MDS_REINT_CREATE_SYM",
+                        mds_reint_create_sym_client, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_REINT_CREATE_SYM);
+
 const struct req_format RQF_MDS_REINT_OPEN =
         DEFINE_REQ_FMT0("MDS_REINT_OPEN",
                         mds_reint_open_client, mds_reint_open_server);