Whamcloud - gitweb
- rename locking + CMD aware is_subdir() check.
authoryury <yury>
Mon, 18 Sep 2006 16:11:02 +0000 (16:11 +0000)
committeryury <yury>
Mon, 18 Sep 2006 16:11:02 +0000 (16:11 +0000)
20 files changed:
lustre/cmm/cmm_object.c
lustre/cmm/mdc_object.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_req_layout.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_support.h
lustre/lmv/lmv_intent.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_handler.c
lustre/mds/handler.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_reint.c
lustre/obdclass/lprocfs_status.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/lproc_ptlrpc.c

index b5d64f1..1929c09 100644 (file)
@@ -444,7 +444,7 @@ static int __cmm_mode_get(const struct lu_context *ctx, struct md_device *md,
         
         /* get type from src, can be remote req */
         rc = mo_attr_get(ctx, md_object_next(mo_s), tmp_ma);
-        if (rc == 0)
+        if (rc == 0 && ma != NULL)
                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
 
         lu_object_put(ctx, &mo_s->mo_lu);
@@ -456,7 +456,6 @@ static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
                        const char *s_name, struct md_object *mo_t,
                        const char *t_name, struct md_attr *ma)
 {
-        struct cmm_thread_info *cmi;
         int rc;
         ENTRY;
 
@@ -467,16 +466,15 @@ static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
                 /* mo_t is remote object and there is RPC to unlink it */
                 rc = mo_ref_del(ctx, md_object_next(mo_t), ma);
                 if (rc)
-                        GOTO(out, rc);
+                        RETURN(rc);
                 mo_t = NULL;
         }
+        
         /* local rename, mo_t can be NULL */
         rc = mdo_rename(ctx, md_object_next(mo_po),
                         md_object_next(mo_pn), lf, s_name,
                         md_object_next(mo_t), t_name, ma);
-        EXIT;
-out:
-        return rc;
+        RETURN(rc);
 }
 
 static int cml_rename_tgt(const struct lu_context *ctx,
@@ -504,7 +502,28 @@ static int cml_name_insert(const struct lu_context *ctx,
         RETURN(rc);
 }
 
+/* Common method for remote and local use. */
+static int cmm_is_subdir(const struct lu_context *ctx, struct md_object *mo,
+                         const struct lu_fid *fid, struct lu_fid *sfid)
+{
+        struct cmm_thread_info *cmi;
+        int rc;
+        ENTRY;
+
+        rc = __cmm_mode_get(ctx, md_obj2dev(mo), fid, NULL);
+        if (rc)
+                RETURN(rc);
+
+        cmi = lu_context_key_get(ctx, &cmm_thread_key);
+        if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
+                RETURN(0);
+        
+        rc = mdo_is_subdir(ctx, md_object_next(mo), fid, sfid);
+        RETURN(rc);
+}
+
 static struct md_dir_operations cml_dir_ops = {
+        .mdo_is_subdir   = cmm_is_subdir,
         .mdo_lookup      = cml_lookup,
         .mdo_create      = cml_create,
         .mdo_link        = cml_link,
@@ -695,8 +714,10 @@ static struct md_object_operations cmr_mo_ops = {
 static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p,
                       const char *name, struct lu_fid *lf)
 {
-        /*this can happens while rename()
-         * If new parent is remote dir, lookup will happens here */
+        /*
+         * This can happens while rename() If new parent is remote dir, lookup
+         * will happen here.
+         */
 
         RETURN(-EREMOTE);
 }
@@ -783,9 +804,9 @@ static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p,
 }
 
 static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po,
-                       struct md_object *mo_pn, const struct lu_fid *lf,
-                       const char *s_name, struct md_object *mo_t,
-                       const char *t_name, struct md_attr *ma)
+                      struct md_object *mo_pn, const struct lu_fid *lf,
+                      const char *s_name, struct md_object *mo_t,
+                      const char *t_name, struct md_attr *ma)
 {
         int rc;
         ENTRY;
@@ -828,6 +849,7 @@ static int cmr_rename_tgt(const struct lu_context *ctx,
 }
 
 static struct md_dir_operations cmr_dir_ops = {
+        .mdo_is_subdir   = cmm_is_subdir,
         .mdo_lookup      = cmr_lookup,
         .mdo_create      = cmr_create,
         .mdo_link        = cmr_link,
index 0146a18..a5d4c3f 100644 (file)
@@ -290,9 +290,9 @@ int mdc_send_page(struct cmm_device *cm, const struct lu_context *ctx,
         ENTRY;
 
         rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
-                                  page, offset);
+                          page, offset);
         CDEBUG(D_INFO, "send page %p  offset %d fid "DFID" rc %d \n",
-                        page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
+               page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
         RETURN(rc);
 }
 #endif
@@ -335,7 +335,43 @@ static int mdc_rename_tgt(const struct lu_context *ctx,
         RETURN(rc);
 }
 
+static int mdc_is_subdir(const struct lu_context *ctx, struct md_object *mo,
+                         const struct lu_fid *fid, struct lu_fid *sfid)
+{
+        struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
+        struct mdc_thread_info *mci;
+        struct mdt_body *body;
+        int rc;
+        ENTRY;
+
+        mci = mdc_info_init(ctx);
+        
+        rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
+                          fid, &mci->mci_req);
+
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_msg_buf(mci->mci_req->rq_repmsg, REPLY_REC_OFF,
+                              sizeof(*body));
+        
+        LASSERT(body->valid & (OBD_MD_FLMODE | OBD_MD_FLID) &&
+                (body->mode == 0 || body->mode == 1 || body->mode == EREMOTE));
+
+        rc = body->mode;
+        if (rc == EREMOTE) {
+                CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "
+                       DFID"\n", PFID(&body->fid1));
+                *sfid = body->fid1;
+        }
+        EXIT;
+out:
+        ptlrpc_req_finished(mci->mci_req);
+        return rc;
+}
+
 static struct md_dir_operations mdc_dir_ops = {
-        .mdo_rename_tgt  = mdc_rename_tgt,
+        .mdo_is_subdir   = mdc_is_subdir,
+        .mdo_rename_tgt  = mdc_rename_tgt
 };
 
index 694ec2e..23b835b 100644 (file)
@@ -825,7 +825,8 @@ typedef enum {
         MDS_QUOTACTL     = 48,
         MDS_GETXATTR     = 49,
         MDS_SETXATTR     = 50,
-        MDS_WRITEPAGE     = 51,
+        MDS_WRITEPAGE    = 51,
+        MDS_IS_SUBDIR    = 52,
         MDS_LAST_OPC
 } mds_cmd_t;
 
index 64cb02b..fe92be7 100644 (file)
@@ -110,6 +110,7 @@ extern const struct req_format RQF_MDS_CONNECT;
 extern const struct req_format RQF_MDS_DISCONNECT;
 extern const struct req_format RQF_MDS_READPAGE;
 extern const struct req_format RQF_MDS_WRITEPAGE;
+extern const struct req_format RQF_MDS_IS_SUBDIR;
 extern const struct req_format RQF_MDS_DONE_WRITING;
 
 /*
index f9be3dd..6fae2d9 100644 (file)
@@ -92,10 +92,10 @@ struct md_create_spec {
  * Operations implemented for each md object (both directory and leaf).
  */
 struct md_object_operations {
-        int (*moo_attr_get)(const struct lu_context *ctxt, struct md_object *dt,
+        int (*moo_attr_get)(const struct lu_context *ctxt, struct md_object *obj,
                             struct md_attr *attr);
 
-        int (*moo_attr_set)(const struct lu_context *ctxt, struct md_object *dt,
+        int (*moo_attr_set)(const struct lu_context *ctxt, struct md_object *obj,
                             const struct md_attr *attr);
 
         int (*moo_xattr_get)(const struct lu_context *ctxt,
@@ -136,6 +136,9 @@ struct md_object_operations {
  * Operations implemented for each directory object.
  */
 struct md_dir_operations {
+        int (*mdo_is_subdir) (const struct lu_context *, struct md_object *,
+                              const struct lu_fid *, struct lu_fid *);
+        
         int (*mdo_lookup)(const struct lu_context *, struct md_object *,
                           const char *, struct lu_fid *);
 
@@ -379,6 +382,13 @@ static inline int mdo_rename(const struct lu_context *cx,
         return tp->mo_dir_ops->mdo_rename(cx, sp, tp, lf, sname, t, tname, ma);
 }
 
+static inline int mdo_is_subdir(const struct lu_context *cx, struct md_object *mo,
+                                const struct lu_fid *fid, struct lu_fid *sfid)
+{
+        LASSERT(mo->mo_dir_ops->mdo_is_subdir);
+        return mo->mo_dir_ops->mdo_is_subdir(cx, mo, fid, sfid);
+}
+
 static inline int mdo_link(const struct lu_context *cx, struct md_object *p,
                            struct md_object *s, const char *name,
                            struct md_attr *ma)
index 6dce92d..5c1ac2c 100644 (file)
@@ -1128,6 +1128,8 @@ struct md_ops {
         int (*m_rename)(struct obd_export *, struct md_op_data *,
                         const char *, int, const char *, int,
                         struct ptlrpc_request **);
+        int (*m_is_subdir)(struct obd_export *, const struct lu_fid *,
+                           const struct lu_fid *, struct ptlrpc_request **);
         int (*m_setattr)(struct obd_export *, struct md_op_data *, void *,
                          int , void *, int, struct ptlrpc_request **);
         int (*m_sync)(struct obd_export *, const struct lu_fid *,
index cb0060d..a893765 100644 (file)
@@ -1646,7 +1646,7 @@ static inline int md_enqueue(struct obd_export *exp, int lock_type,
 
 static inline int md_getattr_name(struct obd_export *exp,
                                   const struct lu_fid *fid,
-                                  const char *filename, int namelen,
+                                  const char *name, int namelen,
                                   obd_valid valid, int ea_size,
                                   struct ptlrpc_request **request)
 {
@@ -1654,7 +1654,7 @@ static inline int md_getattr_name(struct obd_export *exp,
         ENTRY;
         EXP_CHECK_MD_OP(exp, getattr_name);
         MD_COUNTER_INCREMENT(exp->exp_obd, getattr_name);
-        rc = MDP(exp->exp_obd, getattr_name)(exp, fid, filename, namelen,
+        rc = MDP(exp->exp_obd, getattr_name)(exp, fid, name, namelen,
                                              valid, ea_size, request);
         RETURN(rc);
 }
@@ -1704,6 +1704,19 @@ static inline int md_rename(struct obd_export *exp,
         RETURN(rc);
 }
 
+static inline int md_is_subdir(struct obd_export *exp,
+                               const struct lu_fid *pfid,
+                               const struct lu_fid *cfid,
+                               struct ptlrpc_request **request)
+{
+        int rc;
+        ENTRY;
+        EXP_CHECK_MD_OP(exp, is_subdir);
+        MD_COUNTER_INCREMENT(exp->exp_obd, is_subdir);
+        rc = MDP(exp->exp_obd, is_subdir)(exp, pfid, cfid, request);
+        RETURN(rc);
+}
+
 static inline int md_setattr(struct obd_export *exp, struct md_op_data *op_data,
                              void *ea, int ealen, void *ea2, int ea2len,
                              struct ptlrpc_request **request)
index cad9b68..3cfed08 100644 (file)
@@ -95,6 +95,8 @@ extern int obd_race_state;
 #define OBD_FAIL_MDS_SETXATTR_WRITE      0x134
 #define OBD_FAIL_MDS_WRITEPAGE_NET       0x135
 #define OBD_FAIL_MDS_WRITEPAGE_PACK      0x136
+#define OBD_FAIL_MDS_IS_SUBDIR_NET       0x137
+#define OBD_FAIL_MDS_IS_SUBDIR_PACK      0x138
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
index 4eabff0..9de3265 100644 (file)
@@ -250,7 +250,7 @@ repeat:
                 }
         } else if (rc == -ESTALE && it->d.lustre.it_lock_mode) {
                 /* cross-ref open can have lookup lock on child */
-                ldlm_lock_decref(&it->d.lustre.it_lock_handle,
+                ldlm_lock_decref((struct lustre_handle *)&it->d.lustre.it_lock_handle,
                                  it->d.lustre.it_lock_mode);
         }
 
index a403ed6..fc33ca9 100644 (file)
@@ -31,6 +31,9 @@ void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
                        __u64 valid, const struct lu_fid *fid,
                        int ea_size, int flags);
 void mdc_pack_rep_body(struct ptlrpc_request *);
+void mdc_is_subdir_pack(struct ptlrpc_request *req, int offset,
+                        const struct lu_fid *pfid,
+                        const struct lu_fid *cfid, int flags);
 void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset,
                      __u32 size, const struct lu_fid *fid);
 void mdc_getattr_pack(struct ptlrpc_request *req, int offset, __u64 valid,
@@ -190,6 +193,9 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
                const char *old, int oldlen, const char *new, int newlen,
                struct ptlrpc_request **request);
 
+int mdc_is_subdir(struct obd_export *exp, const struct lu_fid *pfid,
+                  const struct lu_fid *cfid, struct ptlrpc_request **request);
+
 int mdc_sync(struct obd_export *exp, const struct lu_fid *fid,
              struct ptlrpc_request **);
 
index 2ad40ba..670a55d 100644 (file)
@@ -53,6 +53,20 @@ void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset,
         b->nlink = size;                        /* !! */
 }
 
+void mdc_is_subdir_pack(struct ptlrpc_request *req, int offset,
+                        const struct lu_fid *pfid,
+                        const struct lu_fid *cfid, int flags)
+{
+        struct mdt_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
+
+        if (pfid)
+                b->fid1 = *pfid;
+        if (cfid)
+                b->fid2 = *cfid;
+        b->valid = OBD_MD_FLID;
+        b->flags = flags;
+}
+
 static void mdc_pack_body(struct mdt_body *b)
 {
         LASSERT (b != NULL);
@@ -75,6 +89,7 @@ void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
         b->flags = flags;
         mdc_pack_body(b);
 }
+
 /* packing of MDS records */
 void mdc_create_pack(struct ptlrpc_request *req, int offset,
                      struct md_op_data *op_data, const void *data, int datalen,
index 06af6d2..3e3a338 100644 (file)
@@ -234,6 +234,39 @@ int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
+int mdc_is_subdir(struct obd_export *exp, const struct lu_fid *pfid,
+                  const struct lu_fid *cfid, struct ptlrpc_request **request)
+{
+        int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
+        struct ptlrpc_request *req;
+        struct mdt_body *body;
+        int rc;
+        ENTRY;
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_IS_SUBDIR, 2, size, NULL);
+        if (!req)
+                GOTO(out, rc = -ENOMEM);
+
+        mdc_is_subdir_pack(req, REQ_REC_OFF, pfid, cfid, 0);
+
+        ptlrpc_req_set_repsize(req, 2, size);
+        rc = ptlrpc_queue_wait(req);
+        if (rc != 0)
+                GOTO(out, rc);
+
+        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
+                                  lustre_swab_mdt_body);
+        if (body == NULL) {
+                CERROR ("Can't unpack mdt_body\n");
+                GOTO(out, rc = -EPROTO);
+        }
+        EXIT;
+ out:
+        *request = req;
+        return rc;
+}
+
 static
 int mdc_xattr_common(struct obd_export *exp, const struct lu_fid *fid,
                      int opcode, obd_valid valid, const char *xattr_name,
@@ -1435,6 +1468,7 @@ struct md_ops mdc_md_ops = {
         .m_getattr_name     = mdc_getattr_name,
         .m_intent_lock      = mdc_intent_lock,
         .m_link             = mdc_link,
+        .m_is_subdir        = mdc_is_subdir,
         .m_rename           = mdc_rename,
         .m_setattr          = mdc_setattr,
         .m_setxattr         = mdc_setxattr,
index c8b0354..1645208 100644 (file)
@@ -1338,10 +1338,11 @@ static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
 
         rc = __mdd_finish_unlink(ctxt, mdd_obj, ma, handle);
 
+        EXIT;
 cleanup:
         mdd_write_unlock(ctxt, mdd_obj);
         mdd_trans_stop(ctxt, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
 }
 
 static int mdd_parent_fid(const struct lu_context *ctxt,
@@ -1352,44 +1353,58 @@ static int mdd_parent_fid(const struct lu_context *ctxt,
 }
 
 /*
- * return 0: if lf is the fid of the ancestor of p1
- * otherwise: other_value
+ * return 1: if lf is the fid of the ancestor of p1;
+ * return 0: if not;
+ *
+ * return -EREMOTE: if remote object is found, in this
+ * case fid of remote object is saved to @pf;
+ *
+ * otherwise: values < 0, errors.
  */
 static int mdd_is_parent(const struct lu_context *ctxt,
                          struct mdd_device *mdd,
                          struct mdd_object *p1,
-                         const struct lu_fid *lf)
+                         const struct lu_fid *lf,
+                         struct lu_fid *pf)
 {
-        struct lu_fid * pfid;
         struct mdd_object *parent = NULL;
+        struct lu_fid *pfid;
         int rc;
         ENTRY;
 
+        LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
         pfid = &mdd_ctx_info(ctxt)->mti_fid;
+
+        /* Do not lookup ".." in root, they do not exist there. */
         if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
-                RETURN(1);
+                RETURN(0);
+        
         for(;;) {
                 rc = mdd_parent_fid(ctxt, p1, pfid);
                 if (rc)
                         GOTO(out, rc);
-                if (lu_fid_eq(pfid, lf))
-                        GOTO(out, rc = 0);
                 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
+                        GOTO(out, rc = 0);
+                if (lu_fid_eq(pfid, lf))
                         GOTO(out, rc = 1);
                 if (parent)
                         mdd_object_put(ctxt, parent);
                 parent = mdd_object_find(ctxt, mdd, pfid);
+                
                 /* cross-ref parent, not supported yet */
-                if (parent == NULL)
-                        GOTO(out, rc = -EOPNOTSUPP);
-                else if (IS_ERR(parent))
+                if (parent == NULL) {
+                        if (pf != NULL)
+                                *pf = *pfid;
+                        GOTO(out, rc = -EREMOTE);
+                } else if (IS_ERR(parent))
                         GOTO(out, rc = PTR_ERR(parent));
                 p1 = parent;
         }
+        EXIT;
 out:
         if (parent && !IS_ERR(parent))
                 mdd_object_put(ctxt, parent);
-        RETURN(rc);
+        return rc;
 }
 
 static int mdd_rename_lock(const struct lu_context *ctxt,
@@ -1397,13 +1412,15 @@ static int mdd_rename_lock(const struct lu_context *ctxt,
                            struct mdd_object *src_pobj,
                            struct mdd_object *tgt_pobj)
 {
+        int rc;
         ENTRY;
 
         if (src_pobj == tgt_pobj) {
                 mdd_write_lock(ctxt, src_pobj);
                 RETURN(0);
         }
-        /*compared the parent child relationship of src_p&tgt_p*/
+        
+        /* compared the parent child relationship of src_p&tgt_p */
         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
                 mdd_lock2(ctxt, src_pobj, tgt_pobj);
                 RETURN(0);
@@ -1412,7 +1429,11 @@ static int mdd_rename_lock(const struct lu_context *ctxt,
                 RETURN(0);
         }
 
-        if (!mdd_is_parent(ctxt, mdd, src_pobj, mdo2fid(tgt_pobj))) {
+        rc = mdd_is_parent(ctxt, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
+        if (rc < 0)
+                RETURN(rc);
+
+        if (rc == 1) {
                 mdd_lock2(ctxt, tgt_pobj, src_pobj);
                 RETURN(0);
         }
@@ -1438,7 +1459,6 @@ static int mdd_rename_sanity_check(const struct lu_context *ctxt,
                                    int src_is_dir,
                                    struct mdd_object *tobj)
 {
-        struct mdd_device *mdd =mdo2mdd(&src_pobj->mod_obj);
         int rc = 0, tgt_is_dir;
         ENTRY;
 
@@ -1458,10 +1478,6 @@ static int mdd_rename_sanity_check(const struct lu_context *ctxt,
         if (rc)
                 RETURN(rc);
 
-        /* source should not be ancestor of target dir */
-        if (src_is_dir && !mdd_is_parent(ctxt, mdd, tgt_pobj, sfid))
-                rc = -EINVAL;
-
         RETURN(rc);
 }
 /* src object can be remote that is why we use only fid and type of object */
@@ -1587,6 +1603,32 @@ static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
         RETURN(rc);
 }
 
+/*
+ * returns 1: if fid is subdir of @mo;
+ * returns 0: if fid is not a subdir of @mo;
+ *
+ * returns EREMOTE if remote object is found, fid of remote object is saved to
+ * @fid;
+ *
+ * returns < 0: if error
+ */
+static int mdd_is_subdir(const struct lu_context *ctx, struct md_object *mo,
+                         const struct lu_fid *fid, struct lu_fid *sfid)
+{
+        struct mdd_device *mdd = mdo2mdd(mo);
+        int rc;
+        ENTRY;
+        
+        if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
+                RETURN(0);
+        
+        rc = mdd_is_parent(ctx, mdd, md2mdd_obj(mo), fid, sfid);
+        if (rc == -EREMOTE)
+                rc = EREMOTE;
+        
+        RETURN(rc);
+}
+
 static int __mdd_object_initialize(const struct lu_context *ctxt,
                                    const struct lu_fid *pfid,
                                    struct mdd_object *child,
@@ -2200,6 +2242,7 @@ struct md_device_operations mdd_ops = {
 };
 
 static struct md_dir_operations mdd_dir_ops = {
+        .mdo_is_subdir     = mdd_is_subdir,
         .mdo_lookup        = mdd_lookup,
         .mdo_create        = mdd_create,
         .mdo_rename        = mdd_rename,
index b0ff285..f9c9c7f 100644 (file)
@@ -1366,6 +1366,7 @@ int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_STATFS:
         case MDS_READPAGE:
         case MDS_WRITEPAGE:
+        case MDS_IS_SUBDIR:
         case MDS_REINT:
         case MDS_CLOSE:
         case MDS_DONE_WRITING:
index dd03a60..61e2aed 100644 (file)
@@ -265,7 +265,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
-        if(reqbody->valid & OBD_MD_MEA) {
+        if (reqbody->valid & OBD_MD_MEA) {
                 /* Assumption: MDT_MD size is enough for lmv size FIXME */
                 ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
                 ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD, 
@@ -375,6 +375,43 @@ static int mdt_getattr(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
+static int mdt_is_subdir(struct mdt_thread_info *info)
+{
+        struct mdt_object   *obj = info->mti_object;
+        struct req_capsule  *pill = &info->mti_pill;
+        struct mdt_body     *repbody;
+        int                  rc;
+
+        obj = info->mti_object;
+        LASSERT(obj != NULL);
+        LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
+        ENTRY;
+
+        repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
+
+        /*
+         * We save last checked parent fid to @repbody->fid1 for remote
+         * directory case.
+         */
+        rc = mdo_is_subdir(info->mti_ctxt, mdt_object_child(obj),
+                           &info->mti_tmp_fid2, &repbody->fid1);
+        if (rc < 0)
+                RETURN(rc);
+        
+        /* 
+         * Save error code to ->mode. Later it it is used for detecting the case
+         * of remote subdir.
+         */
+        repbody->mode = rc;
+        repbody->valid = OBD_MD_FLMODE;
+        
+        if (rc == EREMOTE)
+                repbody->valid |= OBD_MD_FLID;
+
+        
+        RETURN(0);
+}
+
 /*
  * UPDATE lock should be taken against parent, and be release before exit;
  * child_bits lock should be taken against child, and be returned back:
@@ -3463,6 +3500,7 @@ DEF_MDT_HNDL_F(HABEO_CORPUS             , CLOSE,        mdt_close),
 DEF_MDT_HNDL_F(HABEO_CORPUS             , DONE_WRITING, mdt_done_writing),
 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
+DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR,    mdt_is_subdir),
 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle)
 };
index da528e1..0705f88 100644 (file)
@@ -301,7 +301,6 @@ struct mdt_thread_info {
         struct mdt_client_data     mti_mcd;
         loff_t                     mti_off;
         struct txn_param           mti_txn_param;
-
 };
 /*
  * Info allocated per-transaction.
index 01798b7..14f7574 100644 (file)
@@ -557,9 +557,37 @@ static void mdt_rename_unlock(struct lustre_handle *lh)
         EXIT;
 }
 
-static int mdt_rename_check(struct mdt_thread_info *info)
+/* 
+ * This is is_subdir() variant, it is CMD is cmm forwards it to correct
+ * target. Source should not be ancestor of target dir. May be other rename
+ * checks can be moved here later.
+ */
+static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
 {
-        return 0;
+        struct mdt_reint_record *rr = &info->mti_rr;
+        struct lu_fid dst_fid = *rr->rr_fid2;
+        struct mdt_object *dst;
+        int rc = 0;
+        ENTRY;
+
+        do {
+                dst = mdt_object_find(info->mti_ctxt, info->mti_mdt, &dst_fid);
+                if (!IS_ERR(dst)) {
+                        rc = mdo_is_subdir(info->mti_ctxt, mdt_object_child(dst),
+                                           fid, &dst_fid);
+                        mdt_object_put(info->mti_ctxt, dst);
+                        if (rc < 0) {
+                                CERROR("Error while doing mdo_is_subdir(), rc %d\n",
+                                       rc);
+                        } else if (rc == 1) {
+                                rc = -EINVAL;
+                        }
+                } else {
+                        rc = PTR_ERR(dst);
+                }
+        } while (rc == EREMOTE);
+        
+        RETURN(rc);
 }
 
 static int mdt_reint_rename(struct mdt_thread_info *info)
@@ -600,10 +628,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
                 RETURN(rc);
         }
 
-        rc = mdt_rename_check(info);
-        if (rc)
-                GOTO(out, rc);
-        
         lh_newp = &info->mti_lh[MDT_LH_NEW];
 
         /* step 1: lock the source dir */
@@ -683,10 +707,15 @@ static int mdt_reint_rename(struct mdt_thread_info *info)
         mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
 
+        /* Check if @dst is subdir of @src. */
+        rc = mdt_rename_check(info, old_fid);
+        if (rc)
+                GOTO(out_unlock_new, rc);
+
         rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
-                        mdt_object_child(mtgtdir), old_fid,
-                        rr->rr_name, mnew ? mdt_object_child(mnew): NULL,
-                        rr->rr_tgt, ma);
+                        mdt_object_child(mtgtdir), old_fid, rr->rr_name,
+                        (mnew ? mdt_object_child(mnew) : NULL), rr->rr_tgt, ma);
+        
         /* handle last link of tgt object */
         if (mnew)
                 mdt_handle_last_unlink(info, mnew, ma);
@@ -701,8 +730,8 @@ out_unlock_target:
         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
 out_unlock_source:
         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
-        mdt_rename_unlock(&rename_lh);
 out:
+        mdt_rename_unlock(&rename_lh);
         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
         return rc;
 }
index ce75703..1e5d5bd 100644 (file)
@@ -832,6 +832,7 @@ int lprocfs_alloc_md_stats(struct obd_device *obd,
         LPROCFS_MD_OP_INIT(num_private_stats, stats, cancel_unused);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, link);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, rename);
+        LPROCFS_MD_OP_INIT(num_private_stats, stats, is_subdir);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, setattr);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, sync);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, readpage);
index 431f1d5..1c918dd 100644 (file)
@@ -314,6 +314,7 @@ static const struct req_format *req_formats[] = {
         &RQF_MDS_PIN,
         &RQF_MDS_READPAGE,
         &RQF_MDS_WRITEPAGE,
+        &RQF_MDS_IS_SUBDIR,
         &RQF_MDS_DONE_WRITING
 };
 
@@ -655,6 +656,11 @@ const struct req_format RQF_MDS_WRITEPAGE =
                         mdt_body_only, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_WRITEPAGE);
 
+const struct req_format RQF_MDS_IS_SUBDIR =
+        DEFINE_REQ_FMT0("MDS_IS_SUBDIR",
+                        mdt_body_only, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_IS_SUBDIR);
+
 #if !defined(__REQ_LAYOUT_USER__)
 
 int req_layout_init(void)
index 2911944..a430307 100644 (file)
@@ -75,7 +75,8 @@ struct ll_rpc_opcode {
         { MDS_QUOTACTL,     "mds_quotactl" },
         { MDS_GETXATTR,     "mds_getxattr" },
         { MDS_SETXATTR,     "mds_setxattr" },
-        { MDS_WRITEPAGE,     "mds_writepage" },
+        { MDS_WRITEPAGE,    "mds_writepage" },
+        { MDS_IS_SUBDIR,    "mds_is_subdir" },
         { LDLM_ENQUEUE,     "ldlm_enqueue" },
         { LDLM_CONVERT,     "ldlm_convert" },
         { LDLM_CANCEL,      "ldlm_cancel" },