#define DEBUG_SUBSYSTEM S_MDS
+#include <lustre_fid.h>
#include "cmm_internal.h"
#include "mdc_internal.h"
-static struct md_object_operations cmm_mo_ops;
-static struct md_dir_operations cmm_dir_ops;
-static struct lu_object_operations cmm_obj_ops;
+static int cmm_fld_lookup(struct cmm_device *cm,
+ const struct lu_fid *fid, mdsno_t *mds)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(fid_is_sane(fid));
+
+ rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), mds);
+ if (rc) {
+ CERROR("can't find mds by seq "LPU64", rc %d\n",
+ fid_seq(fid), rc);
+ RETURN(rc);
+ }
+
+ if (*mds >= cm->cmm_tgt_count) {
+ CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
+ *mds, cm->cmm_tgt_count);
+ rc = -EINVAL;
+ } else {
+ CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "LPU64"\n",
+ *mds, fid_seq(fid));
+ }
+
+ RETURN (rc);
+}
+
+static struct md_object_operations cml_mo_ops;
+static struct md_dir_operations cml_dir_ops;
+static struct lu_object_operations cml_obj_ops;
-static int cmm_fld_lookup(const struct lu_fid *fid)
+static struct md_object_operations cmr_mo_ops;
+static struct md_dir_operations cmr_dir_ops;
+static struct lu_object_operations cmr_obj_ops;
+
+struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
+ const struct lu_object_header *loh,
+ struct lu_device *ld)
{
- int rc;
- /* temporary hack for proto mkdir */
- rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
- CWARN("Get MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
- RETURN(rc);
+ struct lu_object *lo = NULL;
+ const struct lu_fid *fid = &loh->loh_fid;
+ struct cmm_device *cd;
+ mdsno_t mdsnum;
+ int rc = 0;
+
+ ENTRY;
+
+ cd = lu2cmm_dev(ld);
+ if (cd->cmm_flags & CMM_INITIALIZED) {
+ /* get object location */
+ rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mdsnum);
+ if (rc)
+ RETURN(ERR_PTR(rc));
+ } else
+ /*
+ * Device is not yet initialized, cmm_object is being created
+ * as part of early bootstrap procedure (it is /ROOT, or /fld,
+ * etc.). Such object *has* to be local.
+ */
+ mdsnum = cd->cmm_local_num;
+
+ /* select the proper set of operations based on object location */
+ if (mdsnum == cd->cmm_local_num) {
+ struct cml_object *clo;
+
+ OBD_ALLOC_PTR(clo);
+ if (clo != NULL) {
+ lo = &clo->cmm_obj.cmo_obj.mo_lu;
+ lu_object_init(lo, NULL, ld);
+ clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
+ clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
+ lo->lo_ops = &cml_obj_ops;
+ clo->cmm_obj.cmo_local = 1;
+ }
+ } else {
+ struct cmr_object *cro;
+
+ OBD_ALLOC_PTR(cro);
+ if (cro != NULL) {
+ lo = &cro->cmm_obj.cmo_obj.mo_lu;
+ lu_object_init(lo, NULL, ld);
+ cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
+ cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
+ lo->lo_ops = &cmr_obj_ops;
+ cro->cmo_num = mdsnum;
+ cro->cmm_obj.cmo_local = 0;
+ }
+ }
+ RETURN(lo);
}
-/* get child device by mdsnum*/
-static struct lu_device *cmm_get_child(struct cmm_device *d, __u32 num)
+/*
+ * CMM has two types of objects - local and remote. They have different set
+ * of operations so we are avoiding multiple checks in code.
+ */
+
+/*
+ * local CMM object operations. cml_...
+ */
+static inline struct cml_object *lu2cml_obj(struct lu_object *o)
{
- struct lu_device *next = NULL;
+ return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cml_object *md2cml_obj(struct md_object *mo)
+{
+ return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
+}
+static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
+{
+ return container_of0(co, struct cml_object, cmm_obj);
+}
+/* get local child device */
+static struct lu_device *cml_child_dev(struct cmm_device *d)
+{
+ return &d->cmm_child->md_lu_dev;
+}
+
+/* lu_object operations */
+static void cml_object_free(const struct lu_context *ctx,
+ struct lu_object *lo)
+{
+ struct cml_object *clo = lu2cml_obj(lo);
+ lu_object_fini(lo);
+ OBD_FREE_PTR(clo);
+}
+
+static int cml_object_init(const struct lu_context *ctx, struct lu_object *lo)
+{
+ struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
+ struct lu_device *c_dev;
+ struct lu_object *c_obj;
+ int rc;
+
ENTRY;
- if (likely(num == d->cmm_local_num)) {
- next = &d->cmm_child->md_lu_dev;
+
+ c_dev = cml_child_dev(cd);
+ if (c_dev == NULL) {
+ rc = -ENOENT;
} else {
- struct mdc_device *mdc;
- list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
- if (mdc->mc_num == num) {
- next = mdc2lu_dev(mdc);
- break;
- }
+ c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
+ lo->lo_header, c_dev);
+ if (c_obj != NULL) {
+ lu_object_add(lo, c_obj);
+ rc = 0;
+ } else {
+ rc = -ENOMEM;
}
}
- RETURN(next);
+
+ RETURN(rc);
+}
+
+static int cml_object_exists(const struct lu_context *ctx,
+ struct lu_object *lo)
+{
+ return lu_object_exists(ctx, lu_object_next(lo));
}
-struct lu_object *cmm_object_alloc(struct lu_context *ctx,
- struct lu_device *d)
+static int cml_object_print(const struct lu_context *ctx,
+ struct seq_file *f, const struct lu_object *lo)
{
- struct cmm_object *mo;
+ return seq_printf(f, LUSTRE_CMM0_NAME"-local@%p", lo);
+}
+
+static struct lu_object_operations cml_obj_ops = {
+ .loo_object_init = cml_object_init,
+ .loo_object_free = cml_object_free,
+ .loo_object_print = cml_object_print,
+ .loo_object_exists = cml_object_exists
+};
+
+/* CMM local md_object operations */
+static int cml_object_create(const struct lu_context *ctx,
+ struct md_object *mo,
+ struct lu_attr *attr)
+{
+ int rc;
ENTRY;
+ rc = mo_object_create(ctx, md_object_next(mo), attr);
+ RETURN(rc);
+}
- OBD_ALLOC_PTR(mo);
- if (mo != NULL) {
- struct lu_object *o;
+static int cml_attr_get(const struct lu_context *ctx, struct md_object *mo,
+ struct lu_attr *attr)
+{
+ int rc;
+ ENTRY;
+ rc = mo_attr_get(ctx, md_object_next(mo), attr);
+ RETURN(rc);
+}
- o = &mo->cmo_obj.mo_lu;
- lu_object_init(o, NULL, d);
- mo->cmo_obj.mo_ops = &cmm_mo_ops;
- mo->cmo_obj.mo_dir_ops = &cmm_dir_ops;
- o->lo_ops = &cmm_obj_ops;
- RETURN(o);
- } else
- RETURN(NULL);
+static int cml_attr_set(const struct lu_context *ctx, struct md_object *mo,
+ const struct lu_attr *attr)
+{
+ int rc;
+ ENTRY;
+ rc = mo_attr_set(ctx, md_object_next(mo), attr);
+ RETURN(rc);
}
-int cmm_object_init(struct lu_context *ctx, struct lu_object *o)
+static int cml_xattr_get(const struct lu_context *ctx, struct md_object *mo,
+ void *buf, int buflen, const char *name)
{
- struct cmm_device *d = lu2cmm_dev(o->lo_dev);
- struct lu_device *under;
- struct lu_object *below;
- const struct lu_fid *fid = lu_object_fid(o);
- int mdsnum;
+ int rc;
ENTRY;
+ rc = mo_xattr_get(ctx, md_object_next(mo),
+ buf, buflen, name);
+ RETURN(rc);
+}
- /* under device can be MDD or MDC */
- mdsnum = cmm_fld_lookup(fid);
- under = cmm_get_child(d, mdsnum);
- if (under == NULL)
- RETURN(-ENOENT);
+static int cml_xattr_set(const struct lu_context *ctx, struct md_object *mo,
+ const void *buf, int buflen, const char *name)
+{
+ int rc;
+ ENTRY;
+ rc = mo_xattr_set(ctx, md_object_next(mo),
+ buf, buflen, name);
+ RETURN(rc);
+}
- below = under->ld_ops->ldo_object_alloc(ctx, under);
- if (below != NULL) {
- struct cmm_object *co = lu2cmm_obj(o);
+static int cml_ref_add(const struct lu_context *ctx, struct md_object *mo)
+{
+ int rc;
+ ENTRY;
+ rc = mo_ref_add(ctx, md_object_next(mo));
+ RETURN(rc);
+}
- lu_object_add(o, below);
- co->cmo_num = mdsnum;
- RETURN(0);
- } else
- RETURN(-ENOMEM);
+static int cml_ref_del(const struct lu_context *ctx, struct md_object *mo)
+{
+ int rc;
+ ENTRY;
+ rc = mo_ref_del(ctx, md_object_next(mo));
+ RETURN(rc);
}
-void cmm_object_free(struct lu_context *ctx, struct lu_object *o)
+static int cml_open(const struct lu_context *ctx, struct md_object *mo)
{
- struct cmm_object *mo = lu2cmm_obj(o);
- lu_object_fini(o);
- OBD_FREE_PTR(mo);
+ int rc;
+ ENTRY;
+ rc = mo_open(ctx, md_object_next(mo));
+ RETURN(rc);
}
-void cmm_object_release(struct lu_context *ctx, struct lu_object *o)
+static int cml_close(const struct lu_context *ctx, struct md_object *mo)
{
- return;
+ int rc;
+ ENTRY;
+ rc = mo_close(ctx, md_object_next(mo));
+ RETURN(rc);
}
-static int cmm_object_exists(struct lu_context *ctx, struct lu_object *o)
+static struct md_object_operations cml_mo_ops = {
+ .moo_attr_get = cml_attr_get,
+ .moo_attr_set = cml_attr_set,
+ .moo_xattr_get = cml_xattr_get,
+ .moo_xattr_set = cml_xattr_set,
+ .moo_object_create = cml_object_create,
+ .moo_ref_add = cml_ref_add,
+ .moo_ref_del = cml_ref_del,
+ .moo_open = cml_open,
+ .moo_close = cml_close
+};
+
+/* md_dir operations */
+static int cml_lookup(const struct lu_context *ctx, struct md_object *mo_p,
+ const char *name, struct lu_fid *lf)
{
- return lu_object_exists(ctx, lu_object_next(o));
+ int rc;
+ ENTRY;
+ rc = mdo_lookup(ctx, md_object_next(mo_p), name, lf);
+ RETURN(rc);
+
}
-static int cmm_object_print(struct lu_context *ctx,
- struct seq_file *f, const struct lu_object *o)
+static int cml_create(const struct lu_context *ctx,
+ struct md_object *mo_p, const char *name,
+ struct md_object *mo_c, struct lu_attr *attr)
{
- return seq_printf(f, LUSTRE_CMM0_NAME"-object@%p", o);
+ int rc;
+ ENTRY;
+ rc = mdo_create(ctx, md_object_next(mo_p), name,
+ md_object_next(mo_c), attr);
+ RETURN(rc);
}
-/* Metadata API */
-static int cmm_object_create(struct lu_context *ctx,
- struct md_object *mo, struct lu_attr *attr)
+static int cml_link(const struct lu_context *ctx, struct md_object *mo_p,
+ struct md_object *mo_s, const char *name)
{
- struct cmm_object *cmo = md2cmm_obj(mo);
- struct md_object *nxo = cmm2child_obj(cmo);
int rc;
+ ENTRY;
+ rc = mdo_link(ctx, md_object_next(mo_p),
+ md_object_next(mo_s), name);
+ RETURN(rc);
+}
+static int cml_unlink(const struct lu_context *ctx, struct md_object *mo_p,
+ struct md_object *mo_c, const char *name)
+{
+ int rc;
ENTRY;
+ rc = mdo_unlink(ctx, md_object_next(mo_p),
+ md_object_next(mo_c), name);
+ RETURN(rc);
+}
- LASSERT (cmm_is_local_obj(cmo));
+/* rename is split to local/remote by location of new parent dir */
+static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po,
+ struct md_object *mo_pn, const struct lu_fid *lf,
+ const char *s_name, struct md_object *mo_t,
+ const char *t_name)
+{
+ int rc;
+ ENTRY;
- rc = nxo->mo_ops->moo_object_create(ctx, nxo, attr);
+ if (mo_t && !cmm_is_local_obj(md2cmm_obj(mo_t))) {
+ /* mo_t is remote object and there is RPC to unlink it */
+ rc = mo_ref_del(ctx, md_object_next(mo_t));
+ if (rc)
+ RETURN(rc);
+ mo_t = NULL;
+ }
+ /* local rename, mo_t can be NULL */
+ rc = mdo_rename(ctx, md_object_next(mo_po),
+ md_object_next(mo_pn), lf, s_name,
+ md_object_next(mo_t), t_name);
RETURN(rc);
}
-int cmm_mkdir(struct lu_context *ctx, struct lu_attr *attr,
- struct md_object *p, const char *name, struct md_object *c)
+
+static int cml_rename_tgt(const struct lu_context *ctx,
+ struct md_object *mo_p, struct md_object *mo_t,
+ const struct lu_fid *lf, const char *name)
{
- struct cmm_object *cmm_p = md2cmm_obj(p);
- struct cmm_object *cmm_c = md2cmm_obj(c);
- struct md_object *local = cmm2child_obj(cmm_p);
+ int rc;
+ ENTRY;
+
+ rc = mdo_rename_tgt(ctx, md_object_next(mo_p),
+ md_object_next(mo_t), lf, name);
+ RETURN(rc);
+}
+
+static struct md_dir_operations cml_dir_ops = {
+ .mdo_lookup = cml_lookup,
+ .mdo_create = cml_create,
+ .mdo_link = cml_link,
+ .mdo_unlink = cml_unlink,
+ .mdo_rename = cml_rename,
+ .mdo_rename_tgt = cml_rename_tgt,
+};
+
+/* -------------------------------------------------------------------
+ * remote CMM object operations. cmr_...
+ */
+static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
+{
+ return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
+}
+static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
+{
+ return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
+}
+static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
+{
+ return container_of0(co, struct cmr_object, cmm_obj);
+}
+
+/* get proper child device from MDCs */
+static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
+{
+ struct lu_device *next = NULL;
+ struct mdc_device *mdc;
+
+ spin_lock(&d->cmm_tgt_guard);
+ list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
+ if (mdc->mc_num == num) {
+ next = mdc2lu_dev(mdc);
+ break;
+ }
+ }
+ spin_unlock(&d->cmm_tgt_guard);
+ return next;
+}
+
+/* lu_object operations */
+static void cmr_object_free(const struct lu_context *ctx,
+ struct lu_object *lo)
+{
+ struct cmr_object *cro = lu2cmr_obj(lo);
+ lu_object_fini(lo);
+ OBD_FREE_PTR(cro);
+}
+
+static int cmr_object_init(const struct lu_context *ctx, struct lu_object *lo)
+{
+ struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
+ struct lu_device *c_dev;
+ struct lu_object *c_obj;
int rc;
ENTRY;
- if (cmm_is_local_obj(cmm_c)) {
- /* fully local mkdir */
- rc = local->mo_dir_ops->mdo_mkdir(ctx, attr, local, name,
- cmm2child_obj(cmm_c));
+ c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
+ if (c_dev == NULL) {
+ rc = -ENOENT;
} else {
- const struct lu_fid *fid = lu_object_fid(&c->mo_lu);
- struct md_object *remote = cmm2child_obj(cmm_c);
-
- /* remote object creation and local name insert */
- rc = remote->mo_ops->moo_object_create(ctx, remote, attr);
- if (rc == 0) {
- rc = local->mo_dir_ops->mdo_name_insert(ctx, local,
- name, fid,
- attr);
+ c_obj = c_dev->ld_ops->ldo_object_alloc(ctx,
+ lo->lo_header, c_dev);
+ if (c_obj != NULL) {
+ lu_object_add(lo, c_obj);
+ rc = 0;
+ } else {
+ rc = -ENOMEM;
}
}
RETURN(rc);
}
-int cmm_attr_get(struct lu_context *ctx, struct md_object *obj,
- struct lu_attr *attr)
+static int cmr_object_exists(const struct lu_context *ctx,
+ struct lu_object *lo)
{
- struct md_object *next = cmm2child_obj(md2cmm_obj(obj));
+ return lu_object_exists(ctx, lu_object_next(lo));
+}
- return next->mo_ops->moo_attr_get(ctx, next, attr);
+static int cmr_object_print(const struct lu_context *ctx,
+ struct seq_file *f, const struct lu_object *lo)
+{
+ return seq_printf(f, LUSTRE_CMM0_NAME"-remote@%p", lo);
}
-static struct md_dir_operations cmm_dir_ops = {
- .mdo_mkdir = cmm_mkdir,
+static struct lu_object_operations cmr_obj_ops = {
+ .loo_object_init = cmr_object_init,
+ .loo_object_free = cmr_object_free,
+ .loo_object_print = cmr_object_print,
+ .loo_object_exists = cmr_object_exists
};
-static struct md_object_operations cmm_mo_ops = {
- .moo_attr_get = cmm_attr_get,
- .moo_object_create = cmm_object_create,
+/* CMM remote md_object operations. All are invalid */
+static int cmr_object_create(const struct lu_context *ctx,
+ struct md_object *mo,
+ struct lu_attr *attr)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_attr_get(const struct lu_context *ctx, struct md_object *mo,
+ struct lu_attr *attr)
+{
+ RETURN(-EREMOTE);
+}
+
+static int cmr_attr_set(const struct lu_context *ctx, struct md_object *mo,
+ const struct lu_attr *attr)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_xattr_get(const struct lu_context *ctx, struct md_object *mo,
+ void *buf, int buflen, const char *name)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_xattr_set(const struct lu_context *ctx, struct md_object *mo,
+ const void *buf, int buflen, const char *name)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_ref_add(const struct lu_context *ctx, struct md_object *mo)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_ref_del(const struct lu_context *ctx, struct md_object *mo)
+{
+ RETURN(-EFAULT);
+}
+
+static int cmr_open(const struct lu_context *ctx, struct md_object *mo)
+{
+ RETURN(-EREMOTE);
+}
+
+static int cmr_close(const struct lu_context *ctx, struct md_object *mo)
+{
+ RETURN(-EFAULT);
+}
+static struct md_object_operations cmr_mo_ops = {
+ .moo_attr_get = cmr_attr_get,
+ .moo_attr_set = cmr_attr_set,
+ .moo_xattr_get = cmr_xattr_get,
+ .moo_xattr_set = cmr_xattr_set,
+ .moo_object_create = cmr_object_create,
+ .moo_ref_add = cmr_ref_add,
+ .moo_ref_del = cmr_ref_del,
+ .moo_open = cmr_open,
+ .moo_close = cmr_close
};
-static struct lu_object_operations cmm_obj_ops = {
- .loo_object_init = cmm_object_init,
- .loo_object_release = cmm_object_release,
- .loo_object_print = cmm_object_print,
- .loo_object_exists = cmm_object_exists
+/* remote part of md_dir operations */
+static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p,
+ const char *name, struct lu_fid *lf)
+{
+ /*this can happens while rename()
+ * If new parent is remote dir, lookup will happens here */
+
+ RETURN(-EREMOTE);
+}
+
+/*
+ * All methods below are cross-ref by nature. They consist of remote call and
+ * local operation. Due to future rollback functionality there are several
+ * limitations for such methods:
+ * 1) remote call should be done at first to do epoch negotiation between all
+ * MDS involved and to avoid the RPC inside transaction.
+ * 2) only one RPC can be sent - also due to epoch negotiation.
+ * For more details see rollback HLD/DLD.
+ *
+ */
+static int cmr_create(const struct lu_context *ctx,
+ struct md_object *mo_p, const char *name,
+ struct md_object *mo_c, struct lu_attr *attr)
+{
+ int rc;
+
+ ENTRY;
+
+ //XXX: make sure that MDT checks name isn't exist
+
+ /* remote object creation and local name insert */
+ rc = mo_object_create(ctx, md_object_next(mo_c), attr);
+ if (rc == 0) {
+ rc = mdo_name_insert(ctx, md_object_next(mo_p),
+ name, lu_object_fid(&mo_c->mo_lu));
+ }
+
+ RETURN(rc);
+}
+
+static int cmr_link(const struct lu_context *ctx, struct md_object *mo_p,
+ struct md_object *mo_s, const char *name)
+{
+ int rc;
+ ENTRY;
+
+ //XXX: make sure that MDT checks name isn't exist
+
+ rc = mo_ref_add(ctx, md_object_next(mo_s));
+ if (rc == 0) {
+ rc = mdo_name_insert(ctx, md_object_next(mo_p),
+ name, lu_object_fid(&mo_s->mo_lu));
+ }
+
+ RETURN(rc);
+}
+
+static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p,
+ struct md_object *mo_c, const char *name)
+{
+ int rc;
+ ENTRY;
+
+ rc = mo_ref_del(ctx, md_object_next(mo_c));
+ if (rc == 0) {
+ rc = mdo_name_remove(ctx, md_object_next(mo_p),
+ name);
+ }
+
+ RETURN(rc);
+}
+
+static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po,
+ struct md_object *mo_pn, const struct lu_fid *lf,
+ const char *s_name, struct md_object *mo_t,
+ const char *t_name)
+{
+ int rc;
+ ENTRY;
+
+ /* the mo_pn is remote directory, so we cannot even know if there is
+ * mo_t or not. Therefore mo_t is NULL here but remote server should do
+ * lookup and process this further */
+
+ LASSERT(mo_t == NULL);
+ rc = mdo_rename_tgt(ctx, md_object_next(mo_pn),
+ NULL/* mo_t */, lf, t_name);
+ /* only old name is removed localy */
+ if (rc == 0)
+ rc = mdo_name_remove(ctx, md_object_next(mo_po),
+ s_name);
+
+ RETURN(rc);
+}
+
+/* part of cross-ref rename(). Used to insert new name in new parent
+ * and unlink target with same name if it exists */
+static int cmr_rename_tgt(const struct lu_context *ctx,
+ struct md_object *mo_p, struct md_object *mo_t,
+ const struct lu_fid *lf, const char *name)
+{
+ int rc;
+ ENTRY;
+ /* target object is remote one */
+ rc = mo_ref_del(ctx, md_object_next(mo_t));
+ /* continue locally with name handling only */
+ if (rc == 0)
+ rc = mdo_rename_tgt(ctx, md_object_next(mo_p),
+ NULL, lf, name);
+ RETURN(rc);
+}
+
+static struct md_dir_operations cmr_dir_ops = {
+ .mdo_lookup = cmr_lookup,
+ .mdo_create = cmr_create,
+ .mdo_link = cmr_link,
+ .mdo_unlink = cmr_unlink,
+ .mdo_rename = cmr_rename,
+ .mdo_rename_tgt = cmr_rename_tgt,
};
+