Whamcloud - gitweb
set CMM_INITIALIZED when first ADD_MDC arrives
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
index 83f5cee..88ddf24 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <lustre_fid.h>
 #include "cmm_internal.h"
+#include "mdc_internal.h"
 
-static struct md_object_operations cmm_mo_ops;
+static int cmm_fld_lookup(struct cmm_device *cm, 
+                          const struct lu_fid *fid, mdsno_t *mds)
+{
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(fid_is_sane(fid));
 
-struct cmm_object *cmm_object_find(struct cmm_device *d, struct lu_fid *f)
+        rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), mds);
+        if (rc) {
+                CERROR("can't find mds by seq "LPU64", rc %d\n",
+                       fid_seq(fid), rc);
+                RETURN(rc);
+        }
+
+        if (*mds >= cm->cmm_tgt_count) {
+                CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
+                       *mds, cm->cmm_tgt_count);
+                rc = -EINVAL;
+        } else {
+                CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "LPU64"\n",
+                       *mds, fid_seq(fid));
+        }
+
+        RETURN (rc);
+}
+
+static struct md_object_operations cml_mo_ops;
+static struct md_dir_operations    cml_dir_ops;
+static struct lu_object_operations cml_obj_ops;
+
+static struct md_object_operations cmr_mo_ops;
+static struct md_dir_operations    cmr_dir_ops;
+static struct lu_object_operations cmr_obj_ops;
+
+struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
+                                   const struct lu_object_header *loh,
+                                   struct lu_device *ld)
 {
-       struct lu_object *o;
+        struct lu_object  *lo = NULL;
+        const struct lu_fid *fid = &loh->loh_fid;
+        struct cmm_device *cd;
+        mdsno_t mdsnum;
+        int rc = 0;
+
+        ENTRY;
 
-       o = lu_object_find(d->cmm_md_dev.md_lu_dev.ld_site, f);
-       if (IS_ERR(o))
-               return (struct cmm_object *)o;
-       else
-               return container_of(o, struct cmm_object, cmo_obj.mo_lu);
+        cd = lu2cmm_dev(ld);
+        if (cd->cmm_flags & CMM_INITIALIZED) {
+                /* get object location */
+                rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mdsnum);
+                if (rc)
+                        RETURN(ERR_PTR(rc));
+        } else
+                /*
+                 * Device is not yet initialized, cmm_object is being created
+                 * as part of early bootstrap procedure (it is /ROOT, or /fld,
+                 * etc.). Such object *has* to be local.
+                 */
+                mdsnum = cd->cmm_local_num;
+
+        /* select the proper set of operations based on object location */
+        if (mdsnum == cd->cmm_local_num) {
+                struct cml_object *clo;
+
+                OBD_ALLOC_PTR(clo);
+               if (clo != NULL) {
+                       lo = &clo->cmm_obj.cmo_obj.mo_lu;
+                        lu_object_init(lo, NULL, ld);
+                        clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
+                        clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
+                        lo->lo_ops = &cml_obj_ops;
+                        clo->cmm_obj.cmo_local = 1;
+                }
+        } else {
+                struct cmr_object *cro;
+
+                OBD_ALLOC_PTR(cro);
+               if (cro != NULL) {
+                       lo = &cro->cmm_obj.cmo_obj.mo_lu;
+                        lu_object_init(lo, NULL, ld);
+                        cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
+                        cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
+                        lo->lo_ops = &cmr_obj_ops;
+                        cro->cmo_num = mdsnum;
+                        cro->cmm_obj.cmo_local = 0;
+                }
+        }
+        RETURN(lo);
 }
 
-void cmm_object_put(struct cmm_object *o)
+/*
+ * CMM has two types of objects - local and remote. They have different set
+ * of operations so we are avoiding multiple checks in code.
+ */
+
+/*
+ * local CMM object operations. cml_...
+ */
+static inline struct cml_object *lu2cml_obj(struct lu_object *o)
+{
+        return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cml_object *md2cml_obj(struct md_object *mo)
+{
+        return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
+}
+static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
 {
-       lu_object_put(&o->cmo_obj.mo_lu);
+        return container_of0(co, struct cml_object, cmm_obj);
+}
+/* get local child device */
+static struct lu_device *cml_child_dev(struct cmm_device *d)
+{
+        return &d->cmm_child->md_lu_dev;
+}
+
+/* lu_object operations */
+static void cml_object_free(const struct lu_context *ctx,
+                            struct lu_object *lo)
+{
+        struct cml_object *clo = lu2cml_obj(lo);
+        lu_object_fini(lo);
+        OBD_FREE_PTR(clo);
 }
 
-struct lu_object *cmm_object_alloc(struct lu_device *d)
+static int cml_object_init(const struct lu_context *ctx, struct lu_object *lo)
 {
-       struct cmm_object *mo;
+        struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
+        struct lu_device  *c_dev;
+        struct lu_object  *c_obj;
+        int rc;
+
         ENTRY;
 
-       OBD_ALLOC_PTR(mo);
-       if (mo != NULL) {
-               struct lu_object *o;
+        c_dev = cml_child_dev(cd);
+        if (c_dev == NULL) {
+                rc = -ENOENT;
+        } else {
+                c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
+                                                        lo->lo_header, c_dev);
+                if (c_obj != NULL) {
+                        lu_object_add(lo, c_obj);
+                        rc = 0;
+                } else {
+                        rc = -ENOMEM;
+                }
+        }
+
+        RETURN(rc);
+}
+
+static int cml_object_exists(const struct lu_context *ctx,
+                             struct lu_object *lo)
+{
+        return lu_object_exists(ctx, lu_object_next(lo));
+}
 
-               o = &mo->cmo_obj.mo_lu;
-                lu_object_init(o, NULL, d);
-                mo->cmo_obj.mo_ops = &cmm_mo_ops;
-               RETURN(o);
-       } else
-               RETURN(NULL);
+static int cml_object_print(const struct lu_context *ctx,
+                            struct seq_file *f, const struct lu_object *lo)
+{
+       return seq_printf(f, LUSTRE_CMM0_NAME"-local@%p", lo);
 }
 
-int cmm_object_init(struct lu_object *o)
+static struct lu_object_operations cml_obj_ops = {
+       .loo_object_init    = cml_object_init,
+       .loo_object_free    = cml_object_free,
+       .loo_object_print   = cml_object_print,
+       .loo_object_exists  = cml_object_exists
+};
+
+/* CMM local md_object operations */
+static int cml_object_create(const struct lu_context *ctx,
+                             struct md_object *mo,
+                             struct lu_attr *attr)
 {
-       struct cmm_device *d = lu2cmm_dev(o->lo_dev);
-       struct lu_device  *under;
-       struct lu_object  *below;
+        int rc;
         ENTRY;
+        rc = mo_object_create(ctx, md_object_next(mo), attr);
+        RETURN(rc);
+}
 
-       under = &d->cmm_child->md_lu_dev;
-       below = under->ld_ops->ldo_object_alloc(under);
-       if (below != NULL) {
-               lu_object_add(o, below);
-               RETURN(0);
-       } else
-               RETURN(-ENOMEM);
+static int cml_attr_get(const struct lu_context *ctx, struct md_object *mo,
+                        struct lu_attr *attr)
+{
+        int rc;
+        ENTRY;
+        rc = mo_attr_get(ctx, md_object_next(mo), attr);
+        RETURN(rc);
 }
 
-void cmm_object_free(struct lu_object *o)
+static int cml_attr_set(const struct lu_context *ctx, struct md_object *mo,
+                        const struct lu_attr *attr)
 {
-       lu_object_fini(o);
+        int rc;
+        ENTRY;
+        rc = mo_attr_set(ctx, md_object_next(mo), attr);
+        RETURN(rc);
 }
 
-void cmm_object_release(struct lu_object *o)
+static int cml_xattr_get(const struct lu_context *ctx, struct md_object *mo,
+                         void *buf, int buflen, const char *name)
 {
-        return;
+        int rc;
+        ENTRY;
+        rc = mo_xattr_get(ctx, md_object_next(mo),
+                         buf, buflen, name);
+        RETURN(rc);
 }
 
-int cmm_object_print(struct seq_file *f, const struct lu_object *o)
+static int cml_xattr_set(const struct lu_context *ctx, struct md_object *mo,
+                         const void *buf, int buflen, const char *name)
 {
-       return seq_printf(f, LUSTRE_CMM0_NAME"-object@%p", o);
+        int rc;
+        ENTRY;
+        rc = mo_xattr_set(ctx, md_object_next(mo),
+                          buf, buflen, name);
+        RETURN(rc);
 }
 
-/* Locking API */
-#if 0
-static void cmm_lock(struct md_object *obj, __u32 mode)
+static int cml_ref_add(const struct lu_context *ctx, struct md_object *mo)
 {
-        struct cmm_object *cmm_obj = md2cmm_obj(obj);
-        struct cmm_device *cmm_dev = cmm_obj2dev(cmm_obj);
+        int rc;
+        ENTRY;
+        rc = mo_ref_add(ctx, md_object_next(mo));
+        RETURN(rc);
+}
 
-        CMM_DO_CHILD(cmm_dev)->ldo_lock_obj(cmm2child_obj(cmm_obj), mode);
-        return;
+static int cml_ref_del(const struct lu_context *ctx, struct md_object *mo)
+{
+        int rc;
+        ENTRY;
+        rc = mo_ref_del(ctx, md_object_next(mo));
+        RETURN(rc);
 }
 
-static void cmm_unlock(struct md_object *obj, __u32 mode)
+static int cml_open(const struct lu_context *ctx, struct md_object *mo)
 {
-        struct cmm_object *cmm_obj = md2cmm_obj(obj);
-        struct cmm_device *cmm_dev = cmm_obj2dev(cmm_obj);
+        int rc;
+        ENTRY;
+        rc = mo_open(ctx, md_object_next(mo));
+        RETURN(rc);
+}
 
-        CMM_DO_CHILD(cmm_dev)->ldo_unlock_obj(cmm2child_obj(cmm_obj), mode);
-        return;
+static int cml_close(const struct lu_context *ctx, struct md_object *mo)
+{
+        int rc;
+        ENTRY;
+        rc = mo_close(ctx, md_object_next(mo));
+        RETURN(rc);
+}
+
+static struct md_object_operations cml_mo_ops = {
+        .moo_attr_get      = cml_attr_get,
+        .moo_attr_set      = cml_attr_set,
+        .moo_xattr_get     = cml_xattr_get,
+        .moo_xattr_set     = cml_xattr_set,
+        .moo_object_create = cml_object_create,
+        .moo_ref_add       = cml_ref_add,
+        .moo_ref_del       = cml_ref_del,
+        .moo_open          = cml_open,
+        .moo_close         = cml_close
+};
+
+/* md_dir operations */
+static int cml_lookup(const struct lu_context *ctx, struct md_object *mo_p,
+                      const char *name, struct lu_fid *lf)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_lookup(ctx, md_object_next(mo_p), name, lf);
+        RETURN(rc);
+
+}
+
+static int cml_create(const struct lu_context *ctx,
+                      struct md_object *mo_p, const char *name,
+                      struct md_object *mo_c, struct lu_attr *attr)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_create(ctx, md_object_next(mo_p), name,
+                        md_object_next(mo_c), attr);
+        RETURN(rc);
 }
-#endif
-/* Llog API */
-/* Object API */
-/* Metadata API */
-int cmm_root_get(struct md_device *md, struct lu_fid *fid) {
-        struct cmm_device *cmm_dev = md2cmm_dev(md);
 
-        return cmm_child_ops(cmm_dev)->mdo_root_get(cmm_dev->cmm_child, fid);
+static int cml_link(const struct lu_context *ctx, struct md_object *mo_p,
+                    struct md_object *mo_s, const char *name)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_link(ctx, md_object_next(mo_p),
+                      md_object_next(mo_s), name);
+        RETURN(rc);
+}
+
+static int cml_unlink(const struct lu_context *ctx, struct md_object *mo_p,
+                      struct md_object *mo_c, const char *name)
+{
+        int rc;
+        ENTRY;
+        rc = mdo_unlink(ctx, md_object_next(mo_p),
+                        md_object_next(mo_c), name);
+        RETURN(rc);
+}
+
+/* rename is split to local/remote by location of new parent dir */
+static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
+                       struct md_object *mo_pn, const struct lu_fid *lf,
+                       const char *s_name, struct md_object *mo_t,
+                       const char *t_name)
+{
+        int rc;
+        ENTRY;
+
+        if (mo_t && !cmm_is_local_obj(md2cmm_obj(mo_t))) {
+                /* mo_t is remote object and there is RPC to unlink it */
+                rc = mo_ref_del(ctx, md_object_next(mo_t));
+                if (rc)
+                        RETURN(rc);
+                mo_t = NULL;
+        }
+        /* local rename, mo_t can be NULL */
+        rc = mdo_rename(ctx, md_object_next(mo_po),
+                        md_object_next(mo_pn), lf, s_name,
+                        md_object_next(mo_t), t_name);
+
+        RETURN(rc);
 }
 
-int cmm_config(struct md_device *md, const char *name,
-               void *buf, int size, int mode)
+static int cml_rename_tgt(const struct lu_context *ctx,
+                          struct md_object *mo_p, struct md_object *mo_t,
+                          const struct lu_fid *lf, const char *name)
 {
-        struct cmm_device *cmm_dev = md2cmm_dev(md);
-        int result;
+        int rc;
         ENTRY;
-        result = cmm_child_ops(cmm_dev)->mdo_config(cmm_dev->cmm_child,
-                                                    name, buf, size, mode);
-        RETURN(result);
+
+        rc = mdo_rename_tgt(ctx, md_object_next(mo_p),
+                            md_object_next(mo_t), lf, name);
+        RETURN(rc);
+}
+
+static struct md_dir_operations cml_dir_ops = {
+        .mdo_lookup      = cml_lookup,
+        .mdo_create      = cml_create,
+        .mdo_link        = cml_link,
+        .mdo_unlink      = cml_unlink,
+        .mdo_rename      = cml_rename,
+        .mdo_rename_tgt  = cml_rename_tgt,
+};
+
+/* -------------------------------------------------------------------
+ * remote CMM object operations. cmr_...
+ */
+static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
+{
+        return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
+{
+        return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
+}
+static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
+{
+        return container_of0(co, struct cmr_object, cmm_obj);
 }
 
-int cmm_statfs(struct md_device *md, struct kstatfs *sfs) {
-        struct cmm_device *cmm_dev = md2cmm_dev(md);
-       int result;
+/* get proper child device from MDCs */
+static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
+{
+        struct lu_device *next = NULL;
+        struct mdc_device *mdc;
+
+        spin_lock(&d->cmm_tgt_guard);
+        list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
+                if (mdc->mc_num == num) {
+                        next = mdc2lu_dev(mdc);
+                        break;
+                }
+        }
+        spin_unlock(&d->cmm_tgt_guard);
+        return next;
+}
+
+/* lu_object operations */
+static void cmr_object_free(const struct lu_context *ctx,
+                            struct lu_object *lo)
+{
+        struct cmr_object *cro = lu2cmr_obj(lo);
+        lu_object_fini(lo);
+        OBD_FREE_PTR(cro);
+}
+
+static int cmr_object_init(const struct lu_context *ctx, struct lu_object *lo)
+{
+        struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
+        struct lu_device  *c_dev;
+        struct lu_object  *c_obj;
+        int rc;
 
         ENTRY;
-        result = cmm_child_ops(cmm_dev)->mdo_statfs(cmm_dev->cmm_child, sfs);
-        RETURN (result);
+
+        c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
+        if (c_dev == NULL) {
+                rc = -ENOENT;
+        } else {
+                c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
+                                                        lo->lo_header, c_dev);
+                if (c_obj != NULL) {
+                        lu_object_add(lo, c_obj);
+                        rc = 0;
+                } else {
+                        rc = -ENOMEM;
+                }
+        }
+
+        RETURN(rc);
 }
 
-int cmm_mkdir(struct md_object *md_parent, const char *name,
-                     struct md_object *md_child)
+static int cmr_object_exists(const struct lu_context *ctx,
+                             struct lu_object *lo)
 {
-       struct cmm_object *cmm_parent = md2cmm_obj(md_parent);
-        struct md_object  *next       = cmm2child_obj(cmm_parent);
+        return lu_object_exists(ctx, lu_object_next(lo));
+}
 
-        return next->mo_ops->moo_mkdir(next, name, md_child);
+static int cmr_object_print(const struct lu_context *ctx,
+                            struct seq_file *f, const struct lu_object *lo)
+{
+       return seq_printf(f, LUSTRE_CMM0_NAME"-remote@%p", lo);
 }
 
-static struct md_object_operations cmm_mo_ops = {
-        .moo_mkdir      = cmm_mkdir,
-        .moo_mkdir      = cmm_mkdir,
-        .moo_attr_get   = cmm_attr_get,
-//        .moo_rename     = cmm_rename,
-//        .moo_link       = cmm_link,
-//        .moo_attr_set   = cmm_attr_set,
-//        .moo_index_insert = cmm_index_insert,
-//        .moo_index_delete = cmm_index_delete,
-//        .moo_object_create = cmm_object_create,
+static struct lu_object_operations cmr_obj_ops = {
+       .loo_object_init    = cmr_object_init,
+       .loo_object_free    = cmr_object_free,
+       .loo_object_print   = cmr_object_print,
+       .loo_object_exists  = cmr_object_exists
 };
 
-int cmm_attr_get(struct md_object *obj, void *buf, int size,
-                 const char *name, struct context *ctxt)
+/* CMM remote md_object operations. All are invalid */
+static int cmr_object_create(const struct lu_context *ctx,
+                             struct md_object *mo,
+                             struct lu_attr *attr)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_attr_get(const struct lu_context *ctx, struct md_object *mo,
+                        struct lu_attr *attr)
 {
-        struct md_object *next = cmm2child_obj(md2cmm_obj(obj));
+        RETURN(-EREMOTE);
+}
 
-        LASSERT((void *)obj->mo_ops > (void *)0x100);
-        return next->mo_ops->moo_attr_get(next, buf, size, name, ctxt);
+static int cmr_attr_set(const struct lu_context *ctx, struct md_object *mo,
+                        const struct lu_attr *attr)
+{
+        RETURN(-EFAULT);
 }
 
+static int cmr_xattr_get(const struct lu_context *ctx, struct md_object *mo,
+                         void *buf, int buflen, const char *name)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_xattr_set(const struct lu_context *ctx, struct md_object *mo,
+                         const void *buf, int buflen, const char *name)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_ref_add(const struct lu_context *ctx, struct md_object *mo)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_ref_del(const struct lu_context *ctx, struct md_object *mo)
+{
+        RETURN(-EFAULT);
+}
+
+static int cmr_open(const struct lu_context *ctx, struct md_object *mo)
+{
+        RETURN(-EREMOTE);
+}
+
+static int cmr_close(const struct lu_context *ctx, struct md_object *mo)
+{
+        RETURN(-EFAULT);
+}
+
+static struct md_object_operations cmr_mo_ops = {
+        .moo_attr_get      = cmr_attr_get,
+        .moo_attr_set      = cmr_attr_set,
+        .moo_xattr_get     = cmr_xattr_get,
+        .moo_xattr_set     = cmr_xattr_set,
+        .moo_object_create = cmr_object_create,
+        .moo_ref_add       = cmr_ref_add,
+        .moo_ref_del       = cmr_ref_del,
+        .moo_open          = cmr_open,
+        .moo_close         = cmr_close
+};
+
+/* remote part of md_dir operations */
+static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p,
+                      const char *name, struct lu_fid *lf)
+{
+        /*this can happens while rename()
+         * If new parent is remote dir, lookup will happens here */
+
+        RETURN(-EREMOTE);
+}
+
+/*
+ * All methods below are cross-ref by nature. They consist of remote call and
+ * local operation. Due to future rollback functionality there are several
+ * limitations for such methods:
+ * 1) remote call should be done at first to do epoch negotiation between all
+ * MDS involved and to avoid the RPC inside transaction.
+ * 2) only one RPC can be sent - also due to epoch negotiation.
+ * For more details see rollback HLD/DLD.
+ *
+ */
+static int cmr_create(const struct lu_context *ctx,
+                      struct md_object *mo_p, const char *name,
+                      struct md_object *mo_c, struct lu_attr *attr)
+{
+        int rc;
+
+        ENTRY;
+
+        //XXX: make sure that MDT checks name isn't exist
+
+        /* remote object creation and local name insert */
+        rc = mo_object_create(ctx, md_object_next(mo_c), attr);
+        if (rc == 0) {
+                rc = mdo_name_insert(ctx, md_object_next(mo_p),
+                                     name, lu_object_fid(&mo_c->mo_lu));
+        }
+
+        RETURN(rc);
+}
+
+static int cmr_link(const struct lu_context *ctx, struct md_object *mo_p,
+                    struct md_object *mo_s, const char *name)
+{
+        int rc;
+        ENTRY;
+
+        //XXX: make sure that MDT checks name isn't exist
+
+        rc = mo_ref_add(ctx, md_object_next(mo_s));
+        if (rc == 0) {
+                rc = mdo_name_insert(ctx, md_object_next(mo_p),
+                                     name, lu_object_fid(&mo_s->mo_lu));
+        }
+
+        RETURN(rc);
+}
+
+static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p,
+                      struct md_object *mo_c, const char *name)
+{
+        int rc;
+        ENTRY;
+
+        rc = mo_ref_del(ctx, md_object_next(mo_c));
+        if (rc == 0) {
+                rc = mdo_name_remove(ctx, md_object_next(mo_p),
+                                     name);
+        }
+
+        RETURN(rc);
+}
+
+static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po,
+                       struct md_object *mo_pn, const struct lu_fid *lf,
+                       const char *s_name, struct md_object *mo_t,
+                       const char *t_name)
+{
+        int rc;
+        ENTRY;
+
+        /* the mo_pn is remote directory, so we cannot even know if there is
+         * mo_t or not. Therefore mo_t is NULL here but remote server should do
+         * lookup and process this further */
+
+        LASSERT(mo_t == NULL);
+        rc = mdo_rename_tgt(ctx, md_object_next(mo_pn),
+                            NULL/* mo_t */, lf, t_name);
+        /* only old name is removed localy */
+        if (rc == 0)
+                rc = mdo_name_remove(ctx, md_object_next(mo_po),
+                                     s_name);
+
+        RETURN(rc);
+}
+
+/* part of cross-ref rename(). Used to insert new name in new parent 
+ * and unlink target with same name if it exists */
+static int cmr_rename_tgt(const struct lu_context *ctx,
+                          struct md_object *mo_p, struct md_object *mo_t,
+                          const struct lu_fid *lf, const char *name)
+{
+        int rc;
+        ENTRY;
+        /* target object is remote one */
+        rc = mo_ref_del(ctx, md_object_next(mo_t));
+        /* continue locally with name handling only */
+        if (rc == 0)
+                rc = mdo_rename_tgt(ctx, md_object_next(mo_p),
+                                    NULL, lf, name);
+        RETURN(rc);
+}
+
+static struct md_dir_operations cmr_dir_ops = {
+        .mdo_lookup      = cmr_lookup,
+        .mdo_create      = cmr_create,
+        .mdo_link        = cmr_link,
+        .mdo_unlink      = cmr_unlink,
+        .mdo_rename      = cmr_rename,
+        .mdo_rename_tgt  = cmr_rename_tgt,
+};
+
+