Whamcloud - gitweb
cmm updates
authortappro <tappro>
Wed, 7 Jun 2006 13:16:43 +0000 (13:16 +0000)
committertappro <tappro>
Wed, 7 Jun 2006 13:16:43 +0000 (13:16 +0000)
lustre/cmm/cmm_device.c
lustre/cmm/cmm_internal.h
lustre/cmm/cmm_object.c

index c97a0f1..3550c5b 100644 (file)
@@ -54,7 +54,7 @@ static inline int lu_device_is_cmm(struct lu_device *d)
        return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
 }
 
-int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
+static int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
                  struct lu_fid *fid)
 {
         struct cmm_device *cmm_dev = md2cmm_dev(md);
@@ -63,7 +63,7 @@ int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
                                                     cmm_dev->cmm_child, fid);
 }
 
-int cmm_config(const struct lu_context *ctxt, struct md_device *md,
+static int cmm_config(const struct lu_context *ctxt, struct md_device *md,
                const char *name, void *buf, int size, int mode)
 {
         struct cmm_device *cmm_dev = md2cmm_dev(md);
@@ -74,7 +74,7 @@ int cmm_config(const struct lu_context *ctxt, struct md_device *md,
         RETURN(rc);
 }
 
-int cmm_statfs(const struct lu_context *ctxt, struct md_device *md,
+static int cmm_statfs(const struct lu_context *ctxt, struct md_device *md,
                struct kstatfs *sfs) {
         struct cmm_device *cmm_dev = md2cmm_dev(md);
        int rc;
@@ -93,19 +93,34 @@ static struct md_device_operations cmm_md_ops = {
 
 extern struct lu_device_type mdc_device_type;
 
+/* --- cmm_lu_operations --- */
 /* add new MDC to the CMM, create MDC lu_device and connect it to mdc_obd */
 static int cmm_add_mdc(const struct lu_context *ctx,
                        struct cmm_device * cm, struct lustre_cfg *cfg)
 {
         struct lu_device_type *ldt = &mdc_device_type;
         struct lu_device *ld;
+        struct mdc_device *mc;
+#ifdef CMM_CODE
+        struct mdc_device *tmp;
+        __u32 mdc_num;
+#endif
         int rc;
         ENTRY;
 
-        /*TODO check this MDC exists already */
-
+#ifdef CMM_CODE
+        /* find out that there is no such mdc */
+        LASSERT(lustre_cfg_string(cfg, 2));
+        mdc_num = simple_strtol(lustre_cfg_string(cfg, 2), NULL, 10);
+        spin_lock(&cm->cmm_tgt_guard);
+        list_for_each_entry_safe(mc, tmp, &cm->cmm_targets,
+                                 mc_linkage) {
+                if (mc->mc_num == mdc_num)
+                        RETURN(-EEXIST);
+        }
+        spin_unlock(&cm->cmm_tgt_guard);
+#endif        
         ld = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
-
         ld->ld_site = cmm2lu_dev(cm)->ld_site;
 
         rc = ldt->ldt_ops->ldto_device_init(ctx, ld, NULL);
@@ -115,10 +130,16 @@ static int cmm_add_mdc(const struct lu_context *ctx,
         /* pass config to the just created MDC */
         rc = ld->ld_ops->ldo_process_config(ctx, ld, cfg);
         if (rc == 0) {
-                struct mdc_device *mc = lu2mdc_dev(ld);
+                mc = lu2mdc_dev(ld);
+#ifdef CMM_CODE
+                spin_lock(&cm->cmm_tgt_guard);
+#endif
                 list_add_tail(&mc->mc_linkage, &cm->cmm_targets);
-                lu_device_get(cmm2lu_dev(cm));
                 cm->cmm_tgt_count++;
+#ifdef CMM_CODE
+                spin_unlock(&cm->cmm_tgt_guard);
+#endif                
+                lu_device_get(cmm2lu_dev(cm));
         }
         RETURN(rc);
 }
@@ -150,13 +171,12 @@ static int cmm_process_config(const struct lu_context *ctx,
 
 static struct lu_device_operations cmm_lu_ops = {
        .ldo_object_alloc   = cmm_object_alloc,
-
         .ldo_process_config = cmm_process_config
 };
 
 /* --- lu_device_type operations --- */
 
-struct lu_device *cmm_device_alloc(const struct lu_context *ctx,
+static struct lu_device *cmm_device_alloc(const struct lu_context *ctx,
                                    struct lu_device_type *t,
                                    struct lustre_cfg *cfg)
 {
@@ -179,7 +199,7 @@ struct lu_device *cmm_device_alloc(const struct lu_context *ctx,
         return l;
 }
 
-void cmm_device_free(const struct lu_context *ctx, struct lu_device *d)
+static void cmm_device_free(const struct lu_context *ctx, struct lu_device *d)
 {
         struct cmm_device *m = lu2cmm_dev(d);
 
@@ -188,12 +208,12 @@ void cmm_device_free(const struct lu_context *ctx, struct lu_device *d)
         OBD_FREE_PTR(m);
 }
 
-int cmm_type_init(struct lu_device_type *t)
+static int cmm_type_init(struct lu_device_type *t)
 {
         return 0;
 }
 
-void cmm_type_fini(struct lu_device_type *t)
+static void cmm_type_fini(struct lu_device_type *t)
 {
         return;
 }
@@ -205,7 +225,10 @@ static int cmm_device_init(const struct lu_context *ctx,
         int err = 0;
 
         ENTRY;
-
+        
+#ifdef CMM_CODE
+        spin_lock_init(&m->cmm_tgt_guard);
+#endif
         INIT_LIST_HEAD(&m->cmm_targets);
         m->cmm_tgt_count = 0;
         m->cmm_child = lu2md_dev(next);
index 78b762a..b3688a1 100644 (file)
 #include <obd.h>
 #include <md_object.h>
 
+#ifdef CMM_CODE
+struct cmm_device {
+        struct md_device cmm_md_dev;
+        /* underlaying device in MDS stack, usually MDD */
+        struct md_device *cmm_child;
+        /* other MD servers in cluster */
+        __u32            cmm_local_num;
+        __u32            cmm_tgt_count;
+        struct list_head cmm_targets;
+        spinlock_t       cmm_tgt_guard;
+};
+
+static inline struct md_device_operations *cmm_child_ops(struct cmm_device *d)
+{
+        return (d->cmm_child->md_ops);
+}
+
+static inline struct cmm_device *md2cmm_dev(struct md_device *m)
+{
+        return container_of0(m, struct cmm_device, cmm_md_dev);
+}
+
+static inline struct cmm_device *lu2cmm_dev(struct lu_device *d)
+{
+       return container_of0(d, struct cmm_device, cmm_md_dev.md_lu_dev);
+}
+
+static inline struct lu_device *cmm2lu_dev(struct cmm_device *d)
+{
+       return (&d->cmm_md_dev.md_lu_dev);
+}
+
+struct cmm_object {
+       struct md_object cmo_obj;
+};
+
+/* local CMM objec */
+struct cml_object {
+        struct cmm_object cmm_obj;
+};
+
+/* remote CMM object */
+struct cmr_object {
+        struct cmm_object cmm_obj;
+        /* mds number where object is placed */
+        __u32            cmo_num;
+}
+
+static inline struct cmm_device *cmm_obj2dev(struct cmm_object *c)
+{
+       return (md2cmm_dev(md_device_get(&c->cmo_obj)));
+}
+
+static inline struct cmm_object *lu2cmm_obj(struct lu_object *o)
+{
+       //LASSERT(lu_device_is_cmm(o->lo_dev));
+       return container_of0(o, struct cmm_object, cmo_obj.mo_lu);
+}
+
+/* get cmm object from md_object */
+static inline struct cmm_object *md2cmm_obj(struct md_object *o)
+{
+       return container_of0(o, struct cmm_object, cmo_obj);
+}
+/* get lower-layer object */
+static inline struct md_object *cmm2child_obj(struct cmm_object *o)
+{
+        return lu2md(lu_object_next(&o->cmo_obj.mo_lu));
+}
+
+/* cmm_object.c */
+struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
+                                   const struct lu_object_header *hdr,
+                                   struct lu_device *);
+
+#else
+
 struct cmm_device {
         struct md_device cmm_md_dev;
         /* underlaying device in MDS stack, usually MDD */
@@ -96,5 +173,6 @@ static inline struct md_object *cmm2child_obj(struct cmm_object *o)
 struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *);
+#endif
 #endif /* __KERNEL__ */
 #endif /* _CMM_INTERNAL_H */
index 5f0a5dd..7317900 100644 (file)
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
+#ifdef CMM_CODE
+static int cmm_fld_lookup(const struct lu_fid *fid)
+{
+        int rc;
+        /* temporary hack for proto mkdir */
+        rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
+        CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
+        RETURN(rc);
+}
+
+static struct md_object_operations cml_mo_ops;
+static struct md_dir_operations    cml_dir_ops;
+static struct lu_object_operations cml_obj_ops;
+
+static struct md_object_operations cmr_mo_ops;
+static struct md_dir_operations    cmr_dir_ops;
+static struct lu_object_operations cmr_obj_ops;
+
+struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
+                                   const struct lu_object_header *loh,
+                                   struct lu_device *ld)
+{
+        struct lu_object  *lo = NULL;
+        const struct lu_fid *fid = loh->loh_fid;
+        int mdsnum, rc;
+        ENTRY;
+
+        /* get object location */
+        mdsnum = cmm_fld_lookup(fid);
+
+        /* select the proper set of operations based on object location */
+        if (mdsnum == lu2cmm_dev(ld)->cmm_local_num) {
+                struct cml_object *clo;
+
+                OBD_ALLOC_PTR(clo);
+               if (clo != NULL) {
+                       lo = &clo->cmm_obj.cmo_obj.mo_lu;
+                        lu_object_init(lo, NULL, ld);
+                        clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
+                        clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
+                        lo->lo_ops = &cml_obj_ops;
+                }
+        } else {
+                struct cmr_object *cro;
+                
+                OBD_ALLOC_PTR(cro);
+               if (cro != NULL) {
+                       lo = &cro->cmm_obj.cmo_obj.mo_lu;
+                        lu_object_init(lo, NULL, ld);
+                        cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
+                        cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
+                        lo->lo_ops = &cmr_obj_ops;
+                        cro->cmo_num = mdsnum;
+                }
+        }
+        RETURN(lo);
+}
+
+/*
+ * CMM has two types of objects - local and remote. They have different set 
+ * of operations so we are avoiding multiple checks in code.
+ */
+
+/*
+ * local CMM object operations. cml_...
+ */
+static inline struct cml_object *lu2cml_obj(struct lu_object *o)
+{
+        return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cml_object *md2cml_obj(struct md_object *mo)
+{
+        return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
+}
+static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
+{
+        return container_of0(co, struct cml_object, cmm_obj);
+}
+/* get local child device */
+static struct lu_device *cml_child_dev(struct cmm_device *d)
+{
+        return next = &d->cmm_child->md_lu_dev;
+}
+
+/* lu_object operations */
+static void cml_object_free(const struct lu_context *ctx,
+                            struct lu_object *lo)
+{
+        struct cml_object *clo = lu2cml_obj(lo);
+        lu_object_fini(lo);
+        OBD_FREE_PTR(clo);
+}
+
+static int cml_object_init(const struct lu_context *ctx, struct lu_object *lo)
+{
+        struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
+        struct lu_device  *c_dev;
+        struct lu_object  *c_obj;
+        int rc;
+
+        ENTRY;
+
+        c_dev = cml_child_dev(cd);
+        if (c_dev == NULL) {
+                rc = -ENOENT;
+        } else {
+                c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
+                                                        lo->lo_header, c_dev);
+                if (c_obj != NULL) {
+                        lu_object_add(lo, c_obj);
+                        rc = 0;
+                } else {
+                        rc = -ENOMEM;
+                }
+        }
+
+        RETURN(rc);
+}
+
+static int cml_object_exists(const struct lu_context *ctx,
+                             struct lu_object *lo)
+{
+        return lu_object_exists(ctx, lu_object_next(lo));
+}
+
+static int cml_object_print(const struct lu_context *ctx,
+                            struct seq_file *f, const struct lu_object *lo)
+{
+       return seq_printf(f, LUSTRE_CMM0_NAME"-object@%p", lo);
+}
+
+static struct lu_object_operations cml_obj_ops = {
+       .loo_object_init    = cml_object_init,
+       .loo_object_free    = cml_object_free,
+       .loo_object_print   = cml_object_print,
+       .loo_object_exists  = cml_object_exists
+};
+
+/* CMM local md_object operations */
+static int cml_object_create(const struct lu_context *ctx,
+                             struct md_object *mo,
+                             struct lu_attr *attr)
+{
+        int rc;
+        ENTRY;
+        rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+        RETURN(rc);
+}
+
+static int cml_attr_get(const struct lu_context *ctx, struct md_object *mo,
+                        struct lu_attr *attr)
+{
+        int rc;
+        ENTRY;
+        rc = mo_attr_get(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+        RETURN(rc);
+}
+
+static int cml_attr_set(const struct lu_context *ctx, struct md_object *mo,
+                        struct lu_attr *attr)
+{
+        int rc;
+        ENTRY;
+        rc = mo_attr_set(ctx, cmm2child_obj(md2cmm_obj(mo)), attr);
+        RETURN(rc);
+}
+
+static int cml_xattr_get(const struct lu_context *ctx, struct md_object *mo,
+                         void *buf, int buflen, const char *name)
+{
+        int rc;
+        ENTRY;
+        rc = mo_xattr_get(ctx, cmm2child_obj(md2cmm_obj(mo)),
+                         buf, buflen, name);
+        RETURN(rc);
+}
+
+static int cml_xattr_set(const struct lu_context *ctx, struct md_object *mo,
+                         void *buf, int buflen, const char *name)
+{
+        int rc;
+        ENTRY;
+        rc = mo_xattr_set(ctx, cmm2child_obj(md2cmm_obj(mo)),
+                          buf, buflen, name);
+        RETURN(rc);
+}
+
+static int cml_ref_add(const struct lu_context *ctx, struct md_object *mo)
+{
+        int rc;
+        ENTRY;
+        rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo)));
+        RETURN(rc);
+}
+
+static int cml_ref_del(const struct lu_context *ctx, struct md_object *mo)
+{
+        int rc;
+        ENTRY;
+        rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo)));
+        RETURN(rc);
+}
+
+static int cml_open(const struct lu_context *ctx, struct md_object *mo)
+{
+        int rc;
+        ENTRY;
+        rc = mo_open(ctx, cmm2child_obj(md2cmm_obj(mo)));
+        RETURN(rc);
+}
+
+static int cml_close(const struct lu_context *ctx, struct md_object *mo)
+{
+        int rc;
+        ENTRY;
+        rc = mo_close(ctx, cmm2child_obj(md2cmm_obj(mo)));
+        RETURN(rc);
+}
+
+static struct md_object_operations cml_mo_ops = {
+        .moo_attr_get      = cml_attr_get,
+        .moo_attr_set      = cml_attr_set,
+        .moo_xattr_get     = cml_xattr_get,
+        .moo_xattr_set     = cml_xattr_set,
+        .moo_object_create = cml_object_create,
+        .moo_ref_add       = cml_ref_add,
+        .moo_ref_del       = cml_ref_del,
+        .moo_open          = cml_open,
+        .moo_close         = cml_close
+};
+
+/* md_dir operations */
+static int cml_lookup(const struct lu_context *ctx, struct md_object *mo_p,
+                      const char *name, struct lu_fid *lf)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_lookup(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name, lf);
+        RETURN(rc);
+
+}
+
+static int cml_create(const struct lu_context *ctx,
+                      struct md_object *mo_p, const char *name,
+                      struct md_object *mo_c, struct lu_attr *attr)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_create(ctx, cmm2child_obj(md2cmm_obj(mo_p)), name,
+                        cmm2child_obj(md2cmm_obj(mo_c)), attr);
+        RETURN(rc);
+}
+
+static int cml_link(const struct lu_context *ctx, struct md_object *mo_p,
+                    struct md_object *mo_s, const char *name)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_link(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                      cmm2child_obj(md2cmm_obj(mo_s)), name);
+        RETURN(rc);
+}
+
+static int cml_unlink(const struct lu_context *ctx, struct md_object *mo_p,
+                      struct md_object *mo_c, const char *name)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_unlink(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                        cmm2child_obj(md2cmm_obj(mo_c)), name);
+        RETURN(rc);
+}
+
+static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
+                       struct md_object *mo_pn, struct md_object *mo_s,
+                       const char *s_name, struct md_object *mo_t,
+                       const char *t_name)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_rename(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+                        cmm2child_obj(md2cmm_obj(mo_pn)),
+                        cmm2child_obj(md2cmm_obj(mo_s)), s_name,
+                        cmm2child_obj(md2cmm_obj(mo_t)), t_name);
+        RETURN(rc);
+}
+
+static int cml_rename_tgt(const struct lu_context *ctx,
+                          struct md_object *mo_p,
+                          struct md_object *mo_s, struct md_object *mo_t,
+                          const char *name)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+                            cmm2child_obj(md2cmm_obj(mo_s)),
+                            cmm2child_obj(md2cmm_obj(mo_t)), name);
+        RETURN(rc);
+}
+
+static struct md_dir_operations cmm_dir_ops = {
+        .mdo_lookup      = cml_lookup,
+        .mdo_create      = cml_create,
+        .mdo_link        = cml_link,
+        .mdo_unlink      = cml_unlink,
+        .mdo_rename      = cml_rename,
+        .mdo_rename_tgt  = cml_rename_tgt,
+};
+
+/* -------------------------------------------------------------------
+ * remote CMM object operations. cmr_...
+ */
+static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
+{
+        return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
+{
+        return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
+}
+static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
+{
+        return container_of0(co, struct cmr_object, cmm_obj);
+}
+
+/* get local child device */
+static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
+{
+        struct lu_device *next = NULL;
+        struct mdc_device *mdc;
+        
+        spin_lock(&d->cmm_tgt_guard);
+        list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
+                if (mdc->mc_num == num) {
+                        next = mdc2lu_dev(mdc);
+                        break;
+                }
+        }
+        spin_unlock(&d->cmm_tgt_guard);
+        return next;
+}
+
+/* lu_object operations */
+static void cmr_object_free(const struct lu_context *ctx,
+                            struct lu_object *lo)
+{
+        struct cmr_object *cro = lu2cmr_obj(lo);
+        lu_object_fini(lo);
+        OBD_FREE_PTR(cro);
+}
+
+static int cmr_object_init(const struct lu_context *ctx, struct lu_object *lo)
+{
+        struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
+        struct lu_device  *c_dev;
+        struct lu_object  *c_obj;
+        const struct lu_fid *fid = lu_object_fid(lo);
+        int rc;
+
+        ENTRY;
+        
+        c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
+        if (c_dev == NULL) {
+                rc = -ENOENT;
+        } else {
+                c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
+                                                        lo->lo_header, c_dev);
+                if (c_obj != NULL) {
+                        lu_object_add(lo, c_obj);
+                        rc = 0;
+                } else {
+                        rc = -ENOMEM;
+                }
+        }
+
+        RETURN(rc);
+}
+
+
+static int cmr_object_exists(const struct lu_context *ctx,
+                             struct lu_object *lo)
+{
+        return lu_object_exists(ctx, lu_object_next(lo));
+}
+
+static int cmr_object_print(const struct lu_context *ctx,
+                            struct seq_file *f, const struct lu_object *lo)
+{
+       return seq_printf(f, LUSTRE_CMM0_NAME"-object@%p", lo);
+}
+
+static struct lu_object_operations cml_obj_ops = {
+       .loo_object_init    = cmr_object_init,
+       .loo_object_free    = cmr_object_free,
+       .loo_object_print   = cmr_object_print,
+       .loo_object_exists  = cmr_object_exists
+};
+
+/* CMM remote md_object operations. All are invalid */
+static int cmr_object_create(const struct lu_context *ctx,
+                             struct md_object *mo,
+                             struct lu_attr *attr)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_attr_get(const struct lu_context *ctx, struct md_object *mo,
+                        struct lu_attr *attr)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_attr_set(const struct lu_context *ctx, struct md_object *mo,
+                        struct lu_attr *attr)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_xattr_get(const struct lu_context *ctx, struct md_object *mo,
+                         void *buf, int buflen, const char *name)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_xattr_set(const struct lu_context *ctx, struct md_object *mo,
+                         void *buf, int buflen, const char *name)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_ref_add(const struct lu_context *ctx, struct md_object *mo)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_ref_del(const struct lu_context *ctx, struct md_object *mo)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_open(const struct lu_context *ctx, struct md_object *mo)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_close(const struct lu_context *ctx, struct md_object *mo)
+{
+        RETURN(-EFAULT);
+}
+
+static struct md_object_operations cml_mo_ops = {
+        .moo_attr_get      = cmr_attr_get,
+        .moo_attr_set      = cmr_attr_set,
+        .moo_xattr_get     = cmr_xattr_get,
+        .moo_xattr_set     = cmr_xattr_set,
+        .moo_object_create = cmr_object_create,
+        .moo_ref_add       = cmr_ref_add,
+        .moo_ref_del       = cmr_ref_del,
+        .moo_open          = cmr_open,
+        .moo_close         = cmr_close
+};
+
+/* remote part of md_dir operations */
+static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p,
+                      const char *name, struct lu_fid *lf)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_create(const struct lu_context *ctx,
+                      struct md_object *mo_p, const char *name,
+                      struct md_object *mo_c, struct lu_attr *attr)
+{
+        int rc;
+
+        ENTRY;
+
+        /* remote object creation and local name insert */
+        rc = mo_object_create(ctx, cmm2child_obj(md2cmm_obj(mo_c)), attr);
+        if (rc == 0) {
+                rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                                     name, lu_object_fid(&mo_c->mo_lu));
+                if (rc) 
+                        mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
+        }
+
+        RETURN(rc);
+}
+
+static int cmr_link(const struct lu_context *ctx, struct md_object *mo_p,
+                    struct md_object *mo_s, const char *name)
+{
+        int rc;
+        ENTRY;
+
+        rc = mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo_s)));
+        if (rc == 0) {
+                rc = mdo_name_insert(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                                     name, lu_object_fid(&mo_s->mo_lu));
+                if (rc)
+                        mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_s)));
+        }
+
+        RETURN(rc);
+}
+
+static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p,
+                      struct md_object *mo_c, const char *name)
+{
+        int rc;
+        ENTRY;
+
+        rc = mo_ref_del(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
+        if (rc == 0) {
+                rc = mdo_name_remove(ctx, cmm2child_obj(md2cmm_obj(mo_p)),
+                                     name, lu_object_fid(&mo_c->mo_lu));
+                if (rc)
+                        mo_ref_add(ctx, cmm2child_obj(md2cmm_obj(mo_c)));
+        }
+
+        RETURN(rc);
+}
+
+static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po,
+                       struct md_object *mo_pn, struct md_object *mo_s,
+                       const char *s_name, struct md_object *mo_t,
+                       const char *t_name)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_rename(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+                        cmm2child_obj(md2cmm_obj(mo_pn)),
+                        cmm2child_obj(md2cmm_obj(mo_s)), s_name,
+                        cmm2child_obj(md2cmm_obj(mo_t)), t_name);
+
+        RETURN(rc);
+}
+
+static int cmr_rename_tgt(const struct lu_context *ctx,
+                          struct md_object *mo_p,
+                          struct md_object *mo_s, struct md_object *mo_t,
+                          const char *name)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_rename_tgt(ctx, cmm2child_obj(md2cmm_obj(mo_po)),
+                            cmm2child_obj(md2cmm_obj(mo_s)),
+                            cmm2child_obj(md2cmm_obj(mo_t)), name);
+        RETURN(rc);
+}
+
+static struct md_dir_operations cmm_dir_ops = {
+        .mdo_lookup      = cmr_lookup,
+        .mdo_create      = cmr_create,
+        .mdo_link        = cmr_link,
+        .mdo_unlink      = cmr_unlink,
+        .mdo_rename      = cmr_rename,
+        .mdo_rename_tgt  = cmr_rename_tgt,
+};
+
+#else /* CMM_CODE */
 static struct md_object_operations cmm_mo_ops;
 static struct md_dir_operations    cmm_dir_ops;
 static struct lu_object_operations cmm_obj_ops;
@@ -215,7 +776,7 @@ static int cmm_create(const struct lu_context *ctx,
                 /* remote object creation and local name insert */
                 rc = mo_object_create(ctx, ch_c, attr);
                 if (rc == 0) {
-                        rc = mdo_name_insert(ctx, ch_p, name, lf, attr);
+                        rc = mdo_name_insert(ctx, ch_p, name, lf);
                 }
         }
 
@@ -241,7 +802,7 @@ static int cmm_mkdir(const struct lu_context *ctx, struct lu_attr *attr,
                 /* remote object creation and local name insert */
                 rc = mo_object_create(ctx, ch_c, attr);
                 if (rc == 0) {
-                        rc = mdo_name_insert(ctx, ch_p, name, lf, attr);
+                        rc = mdo_name_insert(ctx, ch_p, name, lf);
                 }
         }
 
@@ -253,6 +814,6 @@ static struct md_dir_operations cmm_dir_ops = {
         .mdo_mkdir         = cmm_mkdir,
         .mdo_create        = cmm_create
 };
-
+#endif /* CMM_CODE */