1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/cmm/cmm_object.c
5 * Lustre Cluster Metadata Manager (cmm)
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Mike Pershin <tappro@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
33 #define DEBUG_SUBSYSTEM S_MDS
35 #include <lustre_fid.h>
36 #include "cmm_internal.h"
37 #include "mdc_internal.h"
39 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
40 mdsno_t *mds, const struct lu_env *env)
45 LASSERT(fid_is_sane(fid));
47 rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
49 CERROR("Can't find mds by seq "LPX64", rc %d\n",
54 if (*mds > cm->cmm_tgt_count) {
55 CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
56 *mds, cm->cmm_tgt_count);
59 CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
60 LPU64"\n", *mds, fid_seq(fid));
66 static struct md_object_operations cml_mo_ops;
67 static struct md_dir_operations cml_dir_ops;
68 static struct lu_object_operations cml_obj_ops;
70 static struct md_object_operations cmr_mo_ops;
71 static struct md_dir_operations cmr_dir_ops;
72 static struct lu_object_operations cmr_obj_ops;
74 struct lu_object *cmm_object_alloc(const struct lu_env *env,
75 const struct lu_object_header *loh,
78 struct lu_object *lo = NULL;
79 const struct lu_fid *fid = &loh->loh_fid;
80 struct cmm_device *cd;
87 if (cd->cmm_flags & CMM_INITIALIZED) {
88 /* get object location */
89 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mdsnum, env);
94 * Device is not yet initialized, cmm_object is being created
95 * as part of early bootstrap procedure (it is /ROOT, or /fld,
96 * etc.). Such object *has* to be local.
98 mdsnum = cd->cmm_local_num;
100 /* select the proper set of operations based on object location */
101 if (mdsnum == cd->cmm_local_num) {
102 struct cml_object *clo;
106 lo = &clo->cmm_obj.cmo_obj.mo_lu;
107 lu_object_init(lo, NULL, ld);
108 clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
109 clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
110 lo->lo_ops = &cml_obj_ops;
113 struct cmr_object *cro;
117 lo = &cro->cmm_obj.cmo_obj.mo_lu;
118 lu_object_init(lo, NULL, ld);
119 cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
120 cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
121 lo->lo_ops = &cmr_obj_ops;
122 cro->cmo_num = mdsnum;
129 * CMM has two types of objects - local and remote. They have different set
130 * of operations so we are avoiding multiple checks in code.
133 /* get local child device */
134 static struct lu_device *cml_child_dev(struct cmm_device *d)
136 return &d->cmm_child->md_lu_dev;
139 /* lu_object operations */
140 static void cml_object_free(const struct lu_env *env,
141 struct lu_object *lo)
143 struct cml_object *clo = lu2cml_obj(lo);
148 static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
150 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
151 struct lu_device *c_dev;
152 struct lu_object *c_obj;
157 #ifdef HAVE_SPLIT_SUPPORT
158 if(cd->cmm_tgt_count == 0)
159 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
161 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
163 c_dev = cml_child_dev(cd);
167 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
168 lo->lo_header, c_dev);
170 lu_object_add(lo, c_obj);
180 static int cml_object_print(const struct lu_env *env, void *cookie,
181 lu_printer_t p, const struct lu_object *lo)
183 return (*p)(env, cookie, LUSTRE_CMM_NAME"-local@%p", lo);
186 static struct lu_object_operations cml_obj_ops = {
187 .loo_object_init = cml_object_init,
188 .loo_object_free = cml_object_free,
189 .loo_object_print = cml_object_print
192 /* CMM local md_object operations */
193 static int cml_object_create(const struct lu_env *env,
194 struct md_object *mo,
195 const struct md_create_spec *spec,
196 struct md_attr *attr)
200 rc = mo_object_create(env, md_object_next(mo), spec, attr);
204 static int cml_permission(const struct lu_env *env,
205 struct md_object *mo, int mask)
209 rc = mo_permission(env, md_object_next(mo), mask);
213 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
214 struct md_attr *attr)
218 rc = mo_attr_get(env, md_object_next(mo), attr);
222 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
223 const struct md_attr *attr)
227 rc = mo_attr_set(env, md_object_next(mo), attr);
231 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
232 struct lu_buf *buf, const char *name)
236 rc = mo_xattr_get(env, md_object_next(mo), buf, name);
240 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
245 rc = mo_readlink(env, md_object_next(mo), buf);
249 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
254 rc = mo_xattr_list(env, md_object_next(mo), buf);
258 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
259 const struct lu_buf *buf,
260 const char *name, int fl)
264 rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
268 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
273 rc = mo_xattr_del(env, md_object_next(mo), name);
277 static int cml_ref_add(const struct lu_env *env, struct md_object *mo)
281 rc = mo_ref_add(env, md_object_next(mo));
285 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
290 rc = mo_ref_del(env, md_object_next(mo), ma);
294 static int cml_open(const struct lu_env *env, struct md_object *mo,
299 rc = mo_open(env, md_object_next(mo), flags);
303 static int cml_close(const struct lu_env *env, struct md_object *mo,
308 rc = mo_close(env, md_object_next(mo), ma);
312 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
313 const struct lu_rdpg *rdpg)
317 rc = mo_readpage(env, md_object_next(mo), rdpg);
321 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
322 struct lustre_capa *capa, int renewal)
326 rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
330 static struct md_object_operations cml_mo_ops = {
331 .moo_permission = cml_permission,
332 .moo_attr_get = cml_attr_get,
333 .moo_attr_set = cml_attr_set,
334 .moo_xattr_get = cml_xattr_get,
335 .moo_xattr_list = cml_xattr_list,
336 .moo_xattr_set = cml_xattr_set,
337 .moo_xattr_del = cml_xattr_del,
338 .moo_object_create = cml_object_create,
339 .moo_ref_add = cml_ref_add,
340 .moo_ref_del = cml_ref_del,
341 .moo_open = cml_open,
342 .moo_close = cml_close,
343 .moo_readpage = cml_readpage,
344 .moo_readlink = cml_readlink,
345 .moo_capa_get = cml_capa_get
348 /* md_dir operations */
349 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
350 const char *name, struct lu_fid *lf)
355 #ifdef HAVE_SPLIT_SUPPORT
356 rc = cmm_split_check(env, mo_p, name);
360 rc = mdo_lookup(env, md_object_next(mo_p), name, lf);
365 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
366 struct md_object *mo, mdl_mode_t lm)
368 int rc = MDL_MINMODE;
371 #ifdef HAVE_SPLIT_SUPPORT
372 rc = cmm_split_access(env, mo, lm);
378 static int cml_create(const struct lu_env *env,
379 struct md_object *mo_p, const char *child_name,
380 struct md_object *mo_c, struct md_create_spec *spec,
386 #ifdef HAVE_SPLIT_SUPPORT
388 /* Lock mode always should be sane. */
389 LASSERT(spec->sp_cr_mode != MDL_MINMODE);
392 * Sigh... This is long story. MDT may have race with detecting if split
393 * is possible in cmm. We know this race and let it live, because
394 * getting it rid (with some sem or spinlock) will also mean that
395 * PDIROPS for create will not work, what is really bad for performance
396 * and makes no sense. So, we better allow the race but split dir only
397 * if some of concurrent threads takes EX lock. So that, say, two
398 * concurrent threads may have different lock modes on directory (CW and
399 * EX) and not first one which comes here should split dir, but only
400 * that which has EX lock. And we do not care that in this case, split
401 * will happen a bit later may be (when dir size will not be mandatory
402 * 64K, but may be larger). So that, we allow concurrent creates and
403 * protect split by EX lock.
405 if (spec->sp_cr_mode == MDL_EX) {
407 * Try to split @mo_p. If split is ok, -ERESTART is returned and
408 * current thread will not peoceed with create. Instead it sends
409 * -ERESTART to client to let it know that correct MDT should be
412 rc = cmm_split_try(env, mo_p);
415 * -ERESTART or some split error is returned, we can't
416 * proceed with create.
422 * Proceed with mdo_create() as nothing happened, split is not
427 * Check for possible split directory and let caller know that
428 * it should tell client that directory is split and operation
429 * should repeat to correct MDT.
431 rc = cmm_split_check(env, mo_p, child_name);
437 rc = mdo_create(env, md_object_next(mo_p), child_name,
438 md_object_next(mo_c), spec, ma);
443 static int cml_create_data(const struct lu_env *env, struct md_object *p,
445 const struct md_create_spec *spec,
450 rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
455 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
456 struct md_object *mo_s, const char *name,
461 rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
466 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
467 struct md_object *mo_c, const char *name,
472 rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
477 /* rename is split to local/remote by location of new parent dir */
478 struct md_object *md_object_find(const struct lu_env *env,
479 struct md_device *md,
480 const struct lu_fid *f)
486 o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
488 m = (struct md_object *)o;
490 o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
491 m = o ? lu2md(o) : NULL;
496 static int __cmm_mode_get(const struct lu_env *env, struct md_device *md,
497 const struct lu_fid *lf, struct md_attr *ma)
499 struct cmm_thread_info *cmi;
500 struct md_object *mo_s = md_object_find(env, md, lf);
501 struct md_attr *tmp_ma;
506 RETURN(PTR_ERR(mo_s));
508 cmi = cmm_env_info(env);
510 tmp_ma = &cmi->cmi_ma;
511 tmp_ma->ma_need = MA_INODE;
512 tmp_ma->ma_valid = 0;
513 /* get type from src, can be remote req */
514 rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
516 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
517 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
518 ma->ma_attr.la_valid |= LA_MODE | LA_FLAGS;
520 lu_object_put(env, &mo_s->mo_lu);
524 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
525 struct md_object *mo_pn, const struct lu_fid *lf,
526 const char *s_name, struct md_object *mo_t,
527 const char *t_name, struct md_attr *ma)
532 rc = __cmm_mode_get(env, md_obj2dev(mo_po), lf, ma);
536 if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
537 /* mo_t is remote object and there is RPC to unlink it */
538 rc = mo_ref_del(env, md_object_next(mo_t), ma);
544 /* local rename, mo_t can be NULL */
545 rc = mdo_rename(env, md_object_next(mo_po),
546 md_object_next(mo_pn), lf, s_name,
547 md_object_next(mo_t), t_name, ma);
551 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
552 struct md_object *mo_t, const struct lu_fid *lf,
553 const char *name, struct md_attr *ma)
558 rc = mdo_rename_tgt(env, md_object_next(mo_p),
559 md_object_next(mo_t), lf, name, ma);
562 /* used only in case of rename_tgt() when target is not exist */
563 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
564 const char *name, const struct lu_fid *lf, int isdir)
569 rc = mdo_name_insert(env, md_object_next(p), name, lf, isdir);
574 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
575 const struct lu_fid *fid, struct lu_fid *sfid)
577 struct cmm_thread_info *cmi;
581 cmi = cmm_env_info(env);
582 rc = __cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma);
586 if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
589 rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
593 static struct md_dir_operations cml_dir_ops = {
594 .mdo_is_subdir = cmm_is_subdir,
595 .mdo_lookup = cml_lookup,
596 .mdo_lock_mode = cml_lock_mode,
597 .mdo_create = cml_create,
598 .mdo_link = cml_link,
599 .mdo_unlink = cml_unlink,
600 .mdo_name_insert = cml_name_insert,
601 .mdo_rename = cml_rename,
602 .mdo_rename_tgt = cml_rename_tgt,
603 .mdo_create_data = cml_create_data
606 /* -------------------------------------------------------------------
607 * remote CMM object operations. cmr_...
609 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
611 return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
613 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
615 return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
617 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
619 return container_of0(co, struct cmr_object, cmm_obj);
622 /* get proper child device from MDCs */
623 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
625 struct lu_device *next = NULL;
626 struct mdc_device *mdc;
628 spin_lock(&d->cmm_tgt_guard);
629 list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
630 if (mdc->mc_num == num) {
631 next = mdc2lu_dev(mdc);
635 spin_unlock(&d->cmm_tgt_guard);
639 /* lu_object operations */
640 static void cmr_object_free(const struct lu_env *env,
641 struct lu_object *lo)
643 struct cmr_object *cro = lu2cmr_obj(lo);
648 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
650 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
651 struct lu_device *c_dev;
652 struct lu_object *c_obj;
657 c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
661 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
662 lo->lo_header, c_dev);
664 lu_object_add(lo, c_obj);
674 static int cmr_object_print(const struct lu_env *env, void *cookie,
675 lu_printer_t p, const struct lu_object *lo)
677 return (*p)(env, cookie, LUSTRE_CMM_NAME"-remote@%p", lo);
680 static struct lu_object_operations cmr_obj_ops = {
681 .loo_object_init = cmr_object_init,
682 .loo_object_free = cmr_object_free,
683 .loo_object_print = cmr_object_print
686 /* CMM remote md_object operations. All are invalid */
687 static int cmr_object_create(const struct lu_env *env,
688 struct md_object *mo,
689 const struct md_create_spec *spec,
695 static int cmr_permission(const struct lu_env *env, struct md_object *mo,
701 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
702 struct md_attr *attr)
707 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
708 const struct md_attr *attr)
713 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
714 struct lu_buf *buf, const char *name)
719 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
725 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
731 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
732 const struct lu_buf *buf, const char *name, int fl)
737 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
743 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo)
748 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
754 static int cmr_open(const struct lu_env *env, struct md_object *mo,
760 static int cmr_close(const struct lu_env *env, struct md_object *mo,
766 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
767 const struct lu_rdpg *rdpg)
772 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
773 struct lustre_capa *capa, int renewal)
778 static struct md_object_operations cmr_mo_ops = {
779 .moo_permission = cmr_permission,
780 .moo_attr_get = cmr_attr_get,
781 .moo_attr_set = cmr_attr_set,
782 .moo_xattr_get = cmr_xattr_get,
783 .moo_xattr_set = cmr_xattr_set,
784 .moo_xattr_list = cmr_xattr_list,
785 .moo_xattr_del = cmr_xattr_del,
786 .moo_object_create = cmr_object_create,
787 .moo_ref_add = cmr_ref_add,
788 .moo_ref_del = cmr_ref_del,
789 .moo_open = cmr_open,
790 .moo_close = cmr_close,
791 .moo_readpage = cmr_readpage,
792 .moo_readlink = cmr_readlink,
793 .moo_capa_get = cmr_capa_get
796 /* remote part of md_dir operations */
797 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
798 const char *name, struct lu_fid *lf)
801 * This can happens while rename() If new parent is remote dir, lookup
808 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
809 struct md_object *mo, mdl_mode_t lm)
815 * All methods below are cross-ref by nature. They consist of remote call and
816 * local operation. Due to future rollback functionality there are several
817 * limitations for such methods:
818 * 1) remote call should be done at first to do epoch negotiation between all
819 * MDS involved and to avoid the RPC inside transaction.
820 * 2) only one RPC can be sent - also due to epoch negotiation.
821 * For more details see rollback HLD/DLD.
823 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
824 const char *child_name, struct md_object *mo_c,
825 struct md_create_spec *spec,
828 struct cmm_thread_info *cmi;
829 struct md_attr *tmp_ma;
833 /* check the SGID attr */
834 cmi = cmm_env_info(env);
836 tmp_ma = &cmi->cmi_ma;
837 tmp_ma->ma_valid = 0;
838 tmp_ma->ma_need = MA_INODE;
840 #ifdef CONFIG_FS_POSIX_ACL
841 if (!S_ISLNK(ma->ma_attr.la_mode)) {
842 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
843 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
844 tmp_ma->ma_need |= MA_ACL_DEF;
848 rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
852 if (tmp_ma->ma_attr.la_mode & S_ISGID) {
853 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
854 if (S_ISDIR(ma->ma_attr.la_mode)) {
855 ma->ma_attr.la_mode |= S_ISGID;
856 ma->ma_attr.la_valid |= LA_MODE;
860 #ifdef CONFIG_FS_POSIX_ACL
861 if (tmp_ma->ma_valid & MA_ACL_DEF) {
862 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
863 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
864 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
868 /* remote object creation and local name insert */
869 rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
871 rc = mdo_name_insert(env, md_object_next(mo_p),
872 child_name, lu_object_fid(&mo_c->mo_lu),
873 S_ISDIR(ma->ma_attr.la_mode));
879 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
880 struct md_object *mo_s, const char *name,
886 //XXX: make sure that MDT checks name isn't exist
888 rc = mo_ref_add(env, md_object_next(mo_s));
890 rc = mdo_name_insert(env, md_object_next(mo_p),
891 name, lu_object_fid(&mo_s->mo_lu), 0);
897 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
898 struct md_object *mo_c, const char *name,
904 rc = mo_ref_del(env, md_object_next(mo_c), ma);
906 rc = mdo_name_remove(env, md_object_next(mo_p),
907 name, S_ISDIR(ma->ma_attr.la_mode));
913 static int cmr_rename(const struct lu_env *env,
914 struct md_object *mo_po, struct md_object *mo_pn,
915 const struct lu_fid *lf, const char *s_name,
916 struct md_object *mo_t, const char *t_name,
922 /* get real type of src */
923 rc = __cmm_mode_get(env, md_obj2dev(mo_po), lf, ma);
927 LASSERT(mo_t == NULL);
928 /* the mo_pn is remote directory, so we cannot even know if there is
929 * mo_t or not. Therefore mo_t is NULL here but remote server should do
930 * lookup and process this further */
931 rc = mdo_rename_tgt(env, md_object_next(mo_pn),
932 NULL/* mo_t */, lf, t_name, ma);
933 /* only old name is removed localy */
935 rc = mdo_name_remove(env, md_object_next(mo_po),
936 s_name, S_ISDIR(ma->ma_attr.la_mode));
941 /* part of cross-ref rename(). Used to insert new name in new parent
942 * and unlink target */
943 static int cmr_rename_tgt(const struct lu_env *env,
944 struct md_object *mo_p, struct md_object *mo_t,
945 const struct lu_fid *lf, const char *name,
950 /* target object is remote one */
951 rc = mo_ref_del(env, md_object_next(mo_t), ma);
952 /* continue locally with name handling only */
954 rc = mdo_rename_tgt(env, md_object_next(mo_p),
959 static struct md_dir_operations cmr_dir_ops = {
960 .mdo_is_subdir = cmm_is_subdir,
961 .mdo_lookup = cmr_lookup,
962 .mdo_lock_mode = cmr_lock_mode,
963 .mdo_create = cmr_create,
964 .mdo_link = cmr_link,
965 .mdo_unlink = cmr_unlink,
966 .mdo_rename = cmr_rename,
967 .mdo_rename_tgt = cmr_rename_tgt,