Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <lustre_fid.h>
50 #include "cmm_internal.h"
51 #include "mdc_internal.h"
52
53 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
54                    mdsno_t *mds, const struct lu_env *env)
55 {
56         int rc = 0;
57         ENTRY;
58
59         LASSERT(fid_is_sane(fid));
60
61         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
62         if (rc) {
63                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
64                        fid_seq(fid), rc);
65                 RETURN(rc);
66         }
67
68         if (*mds > cm->cmm_tgt_count) {
69                 CERROR("Got invalid mdsno: %x (max: %x)\n",
70                        *mds, cm->cmm_tgt_count);
71                 rc = -EINVAL;
72         } else {
73                 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
74                        LPX64"\n", *mds, fid_seq(fid));
75         }
76
77         RETURN (rc);
78 }
79
80 static const struct md_object_operations cml_mo_ops;
81 static const struct md_dir_operations    cml_dir_ops;
82 static const struct lu_object_operations cml_obj_ops;
83
84 static const struct md_object_operations cmr_mo_ops;
85 static const struct md_dir_operations    cmr_dir_ops;
86 static const struct lu_object_operations cmr_obj_ops;
87
88 struct lu_object *cmm_object_alloc(const struct lu_env *env,
89                                    const struct lu_object_header *loh,
90                                    struct lu_device *ld)
91 {
92         const struct lu_fid *fid = &loh->loh_fid;
93         struct lu_object  *lo = NULL;
94         struct cmm_device *cd;
95         mdsno_t mds;
96         int rc = 0;
97
98         ENTRY;
99
100         cd = lu2cmm_dev(ld);
101         if (cd->cmm_flags & CMM_INITIALIZED) {
102                 /* get object location */
103                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
104                 if (rc)
105                         RETURN(NULL);
106         } else
107                 /*
108                  * Device is not yet initialized, cmm_object is being created
109                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
110                  * etc.). Such object *has* to be local.
111                  */
112                 mds = cd->cmm_local_num;
113
114         /* select the proper set of operations based on object location */
115         if (mds == cd->cmm_local_num) {
116                 struct cml_object *clo;
117
118                 OBD_ALLOC_PTR(clo);
119                 if (clo != NULL) {
120                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
121                         lu_object_init(lo, NULL, ld);
122                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
123                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
124                         lo->lo_ops = &cml_obj_ops;
125                 }
126         } else {
127                 struct cmr_object *cro;
128
129                 OBD_ALLOC_PTR(cro);
130                 if (cro != NULL) {
131                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
132                         lu_object_init(lo, NULL, ld);
133                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
134                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
135                         lo->lo_ops = &cmr_obj_ops;
136                         cro->cmo_num = mds;
137                 }
138         }
139         RETURN(lo);
140 }
141
142 /*
143  * CMM has two types of objects - local and remote. They have different set
144  * of operations so we are avoiding multiple checks in code.
145  */
146
147 /* get local child device */
148 static struct lu_device *cml_child_dev(struct cmm_device *d)
149 {
150         return &d->cmm_child->md_lu_dev;
151 }
152
153 /* lu_object operations */
154 static void cml_object_free(const struct lu_env *env,
155                             struct lu_object *lo)
156 {
157         struct cml_object *clo = lu2cml_obj(lo);
158         lu_object_fini(lo);
159         OBD_FREE_PTR(clo);
160 }
161
162 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
163                            const struct lu_object_conf *_)
164 {
165         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
166         struct lu_device  *c_dev;
167         struct lu_object  *c_obj;
168         int rc;
169
170         ENTRY;
171
172 #ifdef HAVE_SPLIT_SUPPORT
173         if (cd->cmm_tgt_count == 0)
174                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
175         else
176                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
177 #endif
178         c_dev = cml_child_dev(cd);
179         if (c_dev == NULL) {
180                 rc = -ENOENT;
181         } else {
182                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
183                                                         lo->lo_header, c_dev);
184                 if (c_obj != NULL) {
185                         lu_object_add(lo, c_obj);
186                         rc = 0;
187                 } else {
188                         rc = -ENOMEM;
189                 }
190         }
191
192         RETURN(rc);
193 }
194
195 static int cml_object_print(const struct lu_env *env, void *cookie,
196                             lu_printer_t p, const struct lu_object *lo)
197 {
198         return (*p)(env, cookie, "[local]");
199 }
200
201 static const struct lu_object_operations cml_obj_ops = {
202         .loo_object_init    = cml_object_init,
203         .loo_object_free    = cml_object_free,
204         .loo_object_print   = cml_object_print
205 };
206
207 /* CMM local md_object operations */
208 static int cml_object_create(const struct lu_env *env,
209                              struct md_object *mo,
210                              const struct md_op_spec *spec,
211                              struct md_attr *attr)
212 {
213         int rc;
214         ENTRY;
215         rc = mo_object_create(env, md_object_next(mo), spec, attr);
216         RETURN(rc);
217 }
218
219 static int cml_permission(const struct lu_env *env,
220                           struct md_object *p, struct md_object *c,
221                           struct md_attr *attr, int mask)
222 {
223         int rc;
224         ENTRY;
225         rc = mo_permission(env, md_object_next(p), md_object_next(c),
226                            attr, mask);
227         RETURN(rc);
228 }
229
230 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
231                         struct md_attr *attr)
232 {
233         int rc;
234         ENTRY;
235         rc = mo_attr_get(env, md_object_next(mo), attr);
236         RETURN(rc);
237 }
238
239 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
240                         const struct md_attr *attr)
241 {
242         int rc;
243         ENTRY;
244         rc = mo_attr_set(env, md_object_next(mo), attr);
245         RETURN(rc);
246 }
247
248 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
249                          struct lu_buf *buf, const char *name)
250 {
251         int rc;
252         ENTRY;
253         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
254         RETURN(rc);
255 }
256
257 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
258                         struct lu_buf *buf)
259 {
260         int rc;
261         ENTRY;
262         rc = mo_readlink(env, md_object_next(mo), buf);
263         RETURN(rc);
264 }
265
266 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
267                           struct lu_buf *buf)
268 {
269         int rc;
270         ENTRY;
271         rc = mo_xattr_list(env, md_object_next(mo), buf);
272         RETURN(rc);
273 }
274
275 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
276                          const struct lu_buf *buf, const char *name,
277                          int fl)
278 {
279         int rc;
280         ENTRY;
281         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
282         RETURN(rc);
283 }
284
285 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
286                          const char *name)
287 {
288         int rc;
289         ENTRY;
290         rc = mo_xattr_del(env, md_object_next(mo), name);
291         RETURN(rc);
292 }
293
294 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
295                        const struct md_attr *ma)
296 {
297         int rc;
298         ENTRY;
299         rc = mo_ref_add(env, md_object_next(mo), ma);
300         RETURN(rc);
301 }
302
303 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
304                        struct md_attr *ma)
305 {
306         int rc;
307         ENTRY;
308         rc = mo_ref_del(env, md_object_next(mo), ma);
309         RETURN(rc);
310 }
311
312 static int cml_open(const struct lu_env *env, struct md_object *mo,
313                     int flags)
314 {
315         int rc;
316         ENTRY;
317         rc = mo_open(env, md_object_next(mo), flags);
318         RETURN(rc);
319 }
320
321 static int cml_close(const struct lu_env *env, struct md_object *mo,
322                      struct md_attr *ma)
323 {
324         int rc;
325         ENTRY;
326         rc = mo_close(env, md_object_next(mo), ma);
327         RETURN(rc);
328 }
329
330 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
331                         const struct lu_rdpg *rdpg)
332 {
333         int rc;
334         ENTRY;
335         rc = mo_readpage(env, md_object_next(mo), rdpg);
336         RETURN(rc);
337 }
338
339 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
340                         struct lustre_capa *capa, int renewal)
341 {
342         int rc;
343         ENTRY;
344         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
345         RETURN(rc);
346 }
347
348 static int cml_path(const struct lu_env *env, struct md_object *mo,
349                     char *path, int pathlen, __u64 *recno, int *linkno)
350 {
351         int rc;
352         ENTRY;
353         rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
354         RETURN(rc);
355 }
356
357 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
358 {
359         int rc;
360         ENTRY;
361         rc = mo_object_sync(env, md_object_next(mo));
362         RETURN(rc);
363 }
364
365 static dt_obj_version_t cml_version_get(const struct lu_env *env,
366                                         struct md_object *mo)
367 {
368         return mo_version_get(env, md_object_next(mo));
369 }
370
371 static void cml_version_set(const struct lu_env *env, struct md_object *mo,
372                             dt_obj_version_t version)
373 {
374         return mo_version_set(env, md_object_next(mo), version);
375 }
376
377 static const struct md_object_operations cml_mo_ops = {
378         .moo_permission    = cml_permission,
379         .moo_attr_get      = cml_attr_get,
380         .moo_attr_set      = cml_attr_set,
381         .moo_xattr_get     = cml_xattr_get,
382         .moo_xattr_list    = cml_xattr_list,
383         .moo_xattr_set     = cml_xattr_set,
384         .moo_xattr_del     = cml_xattr_del,
385         .moo_object_create = cml_object_create,
386         .moo_ref_add       = cml_ref_add,
387         .moo_ref_del       = cml_ref_del,
388         .moo_open          = cml_open,
389         .moo_close         = cml_close,
390         .moo_readpage      = cml_readpage,
391         .moo_readlink      = cml_readlink,
392         .moo_capa_get      = cml_capa_get,
393         .moo_object_sync   = cml_object_sync,
394         .moo_version_get   = cml_version_get,
395         .moo_version_set   = cml_version_set,
396         .moo_path          = cml_path,
397 };
398
399 /* md_dir operations */
400 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
401                       const struct lu_name *lname, struct lu_fid *lf,
402                       struct md_op_spec *spec)
403 {
404         int rc;
405         ENTRY;
406
407 #ifdef HAVE_SPLIT_SUPPORT
408         if (spec != NULL && spec->sp_ck_split) {
409                 rc = cmm_split_check(env, mo_p, lname->ln_name);
410                 if (rc)
411                         RETURN(rc);
412         }
413 #endif
414         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
415         RETURN(rc);
416
417 }
418
419 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
420                                 struct md_object *mo, mdl_mode_t lm)
421 {
422         int rc = MDL_MINMODE;
423         ENTRY;
424
425 #ifdef HAVE_SPLIT_SUPPORT
426         rc = cmm_split_access(env, mo, lm);
427 #endif
428
429         RETURN(rc);
430 }
431
432 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
433                       const struct lu_name *lname, struct md_object *mo_c,
434                       struct md_op_spec *spec, struct md_attr *ma)
435 {
436         int rc;
437         ENTRY;
438
439 #ifdef HAVE_SPLIT_SUPPORT
440         /* Lock mode always should be sane. */
441         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
442
443         /*
444          * Sigh... This is long story. MDT may have race with detecting if split
445          * is possible in cmm. We know this race and let it live, because
446          * getting it rid (with some sem or spinlock) will also mean that
447          * PDIROPS for create will not work because we kill parallel work, what
448          * is really bad for performance and makes no sense having PDIROPS. So,
449          * we better allow the race to live, but split dir only if some of
450          * concurrent threads takes EX lock, not matter which one. So that, say,
451          * two concurrent threads may have different lock modes on directory (CW
452          * and EX) and not first one which comes here and see that split is
453          * possible should split the dir, but only that one which has EX
454          * lock. And we do not care that in this case, split may happen a bit
455          * later (when dir size will not be necessarily 64K, but may be a bit
456          * larger). So that, we allow concurrent creates and protect split by EX
457          * lock.
458          */
459         if (spec->sp_cr_mode == MDL_EX) {
460                 /*
461                  * Try to split @mo_p. If split is ok, -ERESTART is returned and
462                  * current thread will not peoceed with create. Instead it sends
463                  * -ERESTART to client to let it know that correct MDT should be
464                  * chosen.
465                  */
466                 rc = cmm_split_dir(env, mo_p);
467                 if (rc)
468                         /*
469                          * -ERESTART or some split error is returned, we can't
470                          * proceed with create.
471                          */
472                         GOTO(out, rc);
473         }
474
475         if (spec != NULL && spec->sp_ck_split) {
476                 /*
477                  * Check for possible split directory and let caller know that
478                  * it should tell client that directory is split and operation
479                  * should repeat to correct MDT.
480                  */
481                 rc = cmm_split_check(env, mo_p, lname->ln_name);
482                 if (rc)
483                         GOTO(out, rc);
484         }
485 #endif
486
487         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
488                         spec, ma);
489
490         EXIT;
491 #ifdef HAVE_SPLIT_SUPPORT
492 out:
493 #endif
494         return rc;
495 }
496
497 static int cml_create_data(const struct lu_env *env, struct md_object *p,
498                            struct md_object *o,
499                            const struct md_op_spec *spec,
500                            struct md_attr *ma)
501 {
502         int rc;
503         ENTRY;
504         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
505                              spec, ma);
506         RETURN(rc);
507 }
508
509 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
510                     struct md_object *mo_s, const struct lu_name *lname,
511                     struct md_attr *ma)
512 {
513         int rc;
514         ENTRY;
515         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
516                       lname, ma);
517         RETURN(rc);
518 }
519
520 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
521                       struct md_object *mo_c, const struct lu_name *lname,
522                       struct md_attr *ma)
523 {
524         int rc;
525         ENTRY;
526         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
527                         lname, ma);
528         RETURN(rc);
529 }
530
531 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
532                         const struct lu_fid *lf, struct md_attr *ma,
533                         int *remote)
534 {
535         struct md_object *mo_s = md_object_find_slice(env, md, lf);
536         struct cmm_thread_info *cmi;
537         struct md_attr *tmp_ma;
538         int rc;
539         ENTRY;
540
541         if (IS_ERR(mo_s))
542                 RETURN(PTR_ERR(mo_s));
543
544         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
545                 *remote = 1;
546
547         cmi = cmm_env_info(env);
548         tmp_ma = &cmi->cmi_ma;
549         tmp_ma->ma_need = MA_INODE;
550         tmp_ma->ma_valid = 0;
551         /* get type from src, can be remote req */
552         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
553         if (rc == 0) {
554                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
555                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
556                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
557                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
558                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
559         }
560         lu_object_put(env, &mo_s->mo_lu);
561         RETURN(rc);
562 }
563
564 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
565                             const struct lu_fid *lf, struct md_attr *ma)
566 {
567         struct md_object *mo_s = md_object_find_slice(env, md, lf);
568         int rc;
569         ENTRY;
570
571         if (IS_ERR(mo_s))
572                 RETURN(PTR_ERR(mo_s));
573
574         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
575         /* set ctime to obj, can be remote req */
576         rc = mo_attr_set(env, md_object_next(mo_s), ma);
577         lu_object_put(env, &mo_s->mo_lu);
578         RETURN(rc);
579 }
580
581 static inline void cml_rename_warn(const char *fname,
582                                   struct md_object *mo_po,
583                                   struct md_object *mo_pn,
584                                   const struct lu_fid *lf,
585                                   const char *s_name,
586                                   struct md_object *mo_t,
587                                   const char *t_name,
588                                   int err)
589 {
590         if (mo_t)
591                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
592                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
593                       "[tname %s] [err %d]\n", fname,
594                       PFID(lu_object_fid(&mo_po->mo_lu)),
595                       PFID(lu_object_fid(&mo_pn->mo_lu)),
596                       PFID(lf), s_name,
597                       PFID(lu_object_fid(&mo_t->mo_lu)),
598                       t_name, err);
599         else
600                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
601                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
602                       "[tname %s] [err %d]\n", fname,
603                       PFID(lu_object_fid(&mo_po->mo_lu)),
604                       PFID(lu_object_fid(&mo_pn->mo_lu)),
605                       PFID(lf), s_name,
606                       t_name, err);
607 }
608
609 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
610                       struct md_object *mo_pn, const struct lu_fid *lf,
611                       const struct lu_name *ls_name, struct md_object *mo_t,
612                       const struct lu_name *lt_name, struct md_attr *ma)
613 {
614         struct cmm_thread_info *cmi;
615         struct md_attr *tmp_ma = NULL;
616         struct md_object *tmp_t = mo_t;
617         int remote = 0, rc;
618         ENTRY;
619
620         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
621         if (rc)
622                 RETURN(rc);
623
624         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
625                 /* XXX: mo_t is remote object and there is RPC to unlink it.
626                  * before that, do local sanity check for rename first. */
627                 if (!remote) {
628                         struct md_object *mo_s = md_object_find_slice(env,
629                                                         md_obj2dev(mo_po), lf);
630                         if (IS_ERR(mo_s))
631                                 RETURN(PTR_ERR(mo_s));
632
633                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
634                         rc = mo_permission(env, md_object_next(mo_po),
635                                            md_object_next(mo_s),
636                                            ma, MAY_RENAME_SRC);
637                         lu_object_put(env, &mo_s->mo_lu);
638                         if (rc)
639                                 RETURN(rc);
640                 } else {
641                         rc = mo_permission(env, NULL, md_object_next(mo_po),
642                                            ma, MAY_UNLINK | MAY_VTX_FULL);
643                         if (rc)
644                                 RETURN(rc);
645                 }
646
647                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
648                                    MAY_UNLINK | MAY_VTX_PART);
649                 if (rc)
650                         RETURN(rc);
651
652                 /*
653                  * XXX: @ma will be changed after mo_ref_del, but we will use
654                  * it for mdo_rename later, so save it before mo_ref_del.
655                  */
656                 cmi = cmm_env_info(env);
657                 tmp_ma = &cmi->cmi_ma;
658                 *tmp_ma = *ma;
659                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
660                 if (rc)
661                         RETURN(rc);
662
663                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
664                 mo_t = NULL;
665         }
666
667         /* XXX: for src on remote MDS case, change its ctime before local
668          * rename. Firstly, do local sanity check for rename if necessary. */
669         if (remote) {
670                 if (!tmp_ma) {
671                         rc = mo_permission(env, NULL, md_object_next(mo_po),
672                                            ma, MAY_UNLINK | MAY_VTX_FULL);
673                         if (rc)
674                                 RETURN(rc);
675
676                         if (mo_t) {
677                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
678                                 rc = mo_permission(env, md_object_next(mo_pn),
679                                                    md_object_next(mo_t),
680                                                    ma, MAY_RENAME_TAR);
681                                 if (rc)
682                                         RETURN(rc);
683                         } else {
684                                 int mask;
685
686                                 if (mo_po != mo_pn)
687                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
688                                                 MAY_LINK : MAY_CREATE);
689                                 else
690                                         mask = MAY_CREATE;
691                                 rc = mo_permission(env, NULL,
692                                                    md_object_next(mo_pn),
693                                                    NULL, mask);
694                                 if (rc)
695                                         RETURN(rc);
696                         }
697
698                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
699                 } else {
700                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
701                 }
702
703                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
704                                       tmp_ma ? tmp_ma : ma);
705                 if (rc) {
706                         /* TODO: revoke mo_t if necessary. */
707                         cml_rename_warn("cmm_rename_ctime", mo_po,
708                                         mo_pn, lf, ls_name->ln_name,
709                                         tmp_t, lt_name->ln_name, rc);
710                         RETURN(rc);
711                 }
712         }
713
714         /* local rename, mo_t can be NULL */
715         rc = mdo_rename(env, md_object_next(mo_po),
716                         md_object_next(mo_pn), lf, ls_name,
717                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
718         if (rc)
719                 /* TODO: revoke all cml_rename */
720                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
721                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
722
723         RETURN(rc);
724 }
725
726 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
727                           struct md_object *mo_t, const struct lu_fid *lf,
728                           const struct lu_name *lname, struct md_attr *ma)
729 {
730         int rc;
731         ENTRY;
732
733         rc = mdo_rename_tgt(env, md_object_next(mo_p),
734                             md_object_next(mo_t), lf, lname, ma);
735         RETURN(rc);
736 }
737 /* used only in case of rename_tgt() when target is not exist */
738 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
739                            const struct lu_name *lname, const struct lu_fid *lf,
740                            const struct md_attr *ma)
741 {
742         int rc;
743         ENTRY;
744
745         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
746
747         RETURN(rc);
748 }
749
750 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
751                          const struct lu_fid *fid, struct lu_fid *sfid)
752 {
753         struct cmm_thread_info *cmi;
754         int rc;
755         ENTRY;
756
757         cmi = cmm_env_info(env);
758         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
759         if (rc)
760                 RETURN(rc);
761
762         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
763                 RETURN(0);
764
765         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
766         RETURN(rc);
767 }
768
769 static const struct md_dir_operations cml_dir_ops = {
770         .mdo_is_subdir   = cmm_is_subdir,
771         .mdo_lookup      = cml_lookup,
772         .mdo_lock_mode   = cml_lock_mode,
773         .mdo_create      = cml_create,
774         .mdo_link        = cml_link,
775         .mdo_unlink      = cml_unlink,
776         .mdo_name_insert = cml_name_insert,
777         .mdo_rename      = cml_rename,
778         .mdo_rename_tgt  = cml_rename_tgt,
779         .mdo_create_data = cml_create_data
780 };
781
782 /* -------------------------------------------------------------------
783  * remote CMM object operations. cmr_...
784  */
785 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
786 {
787         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
788 }
789 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
790 {
791         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
792 }
793 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
794 {
795         return container_of0(co, struct cmr_object, cmm_obj);
796 }
797
798 /* get proper child device from MDCs */
799 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
800 {
801         struct lu_device *next = NULL;
802         struct mdc_device *mdc;
803
804         spin_lock(&d->cmm_tgt_guard);
805         list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
806                 if (mdc->mc_num == num) {
807                         next = mdc2lu_dev(mdc);
808                         break;
809                 }
810         }
811         spin_unlock(&d->cmm_tgt_guard);
812         return next;
813 }
814
815 /* lu_object operations */
816 static void cmr_object_free(const struct lu_env *env,
817                             struct lu_object *lo)
818 {
819         struct cmr_object *cro = lu2cmr_obj(lo);
820         lu_object_fini(lo);
821         OBD_FREE_PTR(cro);
822 }
823
824 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
825                            const struct lu_object_conf *_)
826 {
827         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
828         struct lu_device  *c_dev;
829         struct lu_object  *c_obj;
830         int rc;
831
832         ENTRY;
833
834         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
835         if (c_dev == NULL) {
836                 rc = -ENOENT;
837         } else {
838                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
839                                                         lo->lo_header, c_dev);
840                 if (c_obj != NULL) {
841                         lu_object_add(lo, c_obj);
842                         rc = 0;
843                 } else {
844                         rc = -ENOMEM;
845                 }
846         }
847
848         RETURN(rc);
849 }
850
851 static int cmr_object_print(const struct lu_env *env, void *cookie,
852                             lu_printer_t p, const struct lu_object *lo)
853 {
854         const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
855         return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
856 }
857
858 static const struct lu_object_operations cmr_obj_ops = {
859         .loo_object_init    = cmr_object_init,
860         .loo_object_free    = cmr_object_free,
861         .loo_object_print   = cmr_object_print
862 };
863
864 /* CMM remote md_object operations. All are invalid */
865 static int cmr_object_create(const struct lu_env *env,
866                              struct md_object *mo,
867                              const struct md_op_spec *spec,
868                              struct md_attr *ma)
869 {
870         return -EFAULT;
871 }
872
873 static int cmr_permission(const struct lu_env *env,
874                           struct md_object *p, struct md_object *c,
875                           struct md_attr *attr, int mask)
876 {
877         return -EREMOTE;
878 }
879
880 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
881                         struct md_attr *attr)
882 {
883         return -EREMOTE;
884 }
885
886 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
887                         const struct md_attr *attr)
888 {
889         return -EFAULT;
890 }
891
892 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
893                          struct lu_buf *buf, const char *name)
894 {
895         return -EFAULT;
896 }
897
898 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
899                         struct lu_buf *buf)
900 {
901         return -EFAULT;
902 }
903
904 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
905                           struct lu_buf *buf)
906 {
907         return -EFAULT;
908 }
909
910 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
911                          const struct lu_buf *buf, const char *name,
912                          int fl)
913 {
914         return -EFAULT;
915 }
916
917 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
918                          const char *name)
919 {
920         return -EFAULT;
921 }
922
923 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
924                        const struct md_attr *ma)
925 {
926         return -EFAULT;
927 }
928
929 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
930                        struct md_attr *ma)
931 {
932         return -EFAULT;
933 }
934
935 static int cmr_open(const struct lu_env *env, struct md_object *mo,
936                     int flags)
937 {
938         return -EREMOTE;
939 }
940
941 static int cmr_close(const struct lu_env *env, struct md_object *mo,
942                      struct md_attr *ma)
943 {
944         return -EFAULT;
945 }
946
947 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
948                         const struct lu_rdpg *rdpg)
949 {
950         return -EREMOTE;
951 }
952
953 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
954                         struct lustre_capa *capa, int renewal)
955 {
956         return -EFAULT;
957 }
958
959 static int cmr_path(const struct lu_env *env, struct md_object *obj,
960                     char *path, int pathlen, __u64 *recno, int *linkno)
961 {
962         return -EREMOTE;
963 }
964
965 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
966 {
967         return -EFAULT;
968 }
969
970 static dt_obj_version_t cmr_version_get(const struct lu_env *env,
971                                         struct md_object *mo)
972 {
973         /* Don't check remote object version */
974         return 0;
975 }
976
977 static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
978                             dt_obj_version_t version)
979 {
980         /* No need to update remote object version here, it is done as a part
981          * of reintegration of partial operation on the remote server */
982 }
983
984 static const struct md_object_operations cmr_mo_ops = {
985         .moo_permission    = cmr_permission,
986         .moo_attr_get      = cmr_attr_get,
987         .moo_attr_set      = cmr_attr_set,
988         .moo_xattr_get     = cmr_xattr_get,
989         .moo_xattr_set     = cmr_xattr_set,
990         .moo_xattr_list    = cmr_xattr_list,
991         .moo_xattr_del     = cmr_xattr_del,
992         .moo_object_create = cmr_object_create,
993         .moo_ref_add       = cmr_ref_add,
994         .moo_ref_del       = cmr_ref_del,
995         .moo_open          = cmr_open,
996         .moo_close         = cmr_close,
997         .moo_readpage      = cmr_readpage,
998         .moo_readlink      = cmr_readlink,
999         .moo_capa_get      = cmr_capa_get,
1000         .moo_object_sync   = cmr_object_sync,
1001         .moo_version_get   = cmr_version_get,
1002         .moo_version_set   = cmr_version_set,
1003         .moo_path          = cmr_path,
1004 };
1005
1006 /* remote part of md_dir operations */
1007 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1008                       const struct lu_name *lname, struct lu_fid *lf,
1009                       struct md_op_spec *spec)
1010 {
1011         /*
1012          * This can happens while rename() If new parent is remote dir, lookup
1013          * will happen here.
1014          */
1015
1016         return -EREMOTE;
1017 }
1018
1019 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1020                                 struct md_object *mo, mdl_mode_t lm)
1021 {
1022         return MDL_MINMODE;
1023 }
1024
1025 /*
1026  * All methods below are cross-ref by nature. They consist of remote call and
1027  * local operation. Due to future rollback functionality there are several
1028  * limitations for such methods:
1029  * 1) remote call should be done at first to do epoch negotiation between all
1030  * MDS involved and to avoid the RPC inside transaction.
1031  * 2) only one RPC can be sent - also due to epoch negotiation.
1032  * For more details see rollback HLD/DLD.
1033  */
1034 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1035                       const struct lu_name *lchild_name, struct md_object *mo_c,
1036                       struct md_op_spec *spec,
1037                       struct md_attr *ma)
1038 {
1039         struct cmm_thread_info *cmi;
1040         struct md_attr *tmp_ma;
1041         int rc;
1042         ENTRY;
1043
1044         /* Make sure that name isn't exist before doing remote call. */
1045         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1046                         &cmm_env_info(env)->cmi_fid, NULL);
1047         if (rc == 0)
1048                 RETURN(-EEXIST);
1049         else if (rc != -ENOENT)
1050                 RETURN(rc);
1051
1052         /* check the SGID attr */
1053         cmi = cmm_env_info(env);
1054         LASSERT(cmi);
1055         tmp_ma = &cmi->cmi_ma;
1056         tmp_ma->ma_valid = 0;
1057         tmp_ma->ma_need = MA_INODE;
1058
1059 #ifdef CONFIG_FS_POSIX_ACL
1060         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1061                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1062                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1063                 tmp_ma->ma_need |= MA_ACL_DEF;
1064         }
1065 #endif
1066         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1067         if (rc)
1068                 RETURN(rc);
1069
1070         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1071                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1072                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1073                         ma->ma_attr.la_mode |= S_ISGID;
1074                         ma->ma_attr.la_valid |= LA_MODE;
1075                 }
1076         }
1077
1078 #ifdef CONFIG_FS_POSIX_ACL
1079         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1080                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1081                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1082                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1083                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1084         }
1085 #endif
1086
1087         /* Local permission check for name_insert before remote ops. */
1088         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1089                            (S_ISDIR(ma->ma_attr.la_mode) ?
1090                            MAY_LINK : MAY_CREATE));
1091         if (rc)
1092                 RETURN(rc);
1093
1094         /* Remote object creation and local name insert. */
1095         /*
1096          * XXX: @ma will be changed after mo_object_create, but we will use
1097          * it for mdo_name_insert later, so save it before mo_object_create.
1098          */
1099         *tmp_ma = *ma;
1100         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1101         if (rc == 0) {
1102                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1103                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1104                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1105                 if (unlikely(rc)) {
1106                         /* TODO: remove object mo_c on remote MDS */
1107                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1108                               " [name %s] [mo_c "DFID"] [err %d]\n",
1109                               PFID(lu_object_fid(&mo_p->mo_lu)),
1110                               lchild_name->ln_name,
1111                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1112                 }
1113         }
1114
1115         RETURN(rc);
1116 }
1117
1118 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1119                     struct md_object *mo_s, const struct lu_name *lname,
1120                     struct md_attr *ma)
1121 {
1122         int rc;
1123         ENTRY;
1124
1125         /* Make sure that name isn't exist before doing remote call. */
1126         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1127                         &cmm_env_info(env)->cmi_fid, NULL);
1128         if (rc == 0) {
1129                 rc = -EEXIST;
1130         } else if (rc == -ENOENT) {
1131                 /* Local permission check for name_insert before remote ops. */
1132                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1133                                    MAY_CREATE);
1134                 if (rc)
1135                         RETURN(rc);
1136
1137                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1138                 if (rc == 0) {
1139                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1140                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1141                                              lu_object_fid(&mo_s->mo_lu), ma);
1142                         if (unlikely(rc)) {
1143                                 /* TODO: ref_del from mo_s on remote MDS */
1144                                 CWARN("cmr_link failed, should revoke: "
1145                                       "[mo_p "DFID"] [mo_s "DFID"] "
1146                                       "[name %s] [err %d]\n",
1147                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1148                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1149                                       lname->ln_name, rc);
1150                         }
1151                 }
1152         }
1153         RETURN(rc);
1154 }
1155
1156 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1157                       struct md_object *mo_c, const struct lu_name *lname,
1158                       struct md_attr *ma)
1159 {
1160         struct cmm_thread_info *cmi;
1161         struct md_attr *tmp_ma;
1162         int rc;
1163         ENTRY;
1164
1165         /* Local permission check for name_remove before remote ops. */
1166         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1167                            MAY_UNLINK | MAY_VTX_PART);
1168         if (rc)
1169                 RETURN(rc);
1170
1171         /*
1172          * XXX: @ma will be changed after mo_ref_del, but we will use
1173          * it for mdo_name_remove later, so save it before mo_ref_del.
1174          */
1175         cmi = cmm_env_info(env);
1176         tmp_ma = &cmi->cmi_ma;
1177         *tmp_ma = *ma;
1178         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1179         if (rc == 0) {
1180                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1181                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1182                 if (unlikely(rc)) {
1183                         /* TODO: ref_add to mo_c on remote MDS */
1184                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1185                               " [mo_c "DFID"] [name %s] [err %d]\n",
1186                               PFID(lu_object_fid(&mo_p->mo_lu)),
1187                               PFID(lu_object_fid(&mo_c->mo_lu)),
1188                               lname->ln_name, rc);
1189                 }
1190         }
1191
1192         RETURN(rc);
1193 }
1194
1195 static inline void cmr_rename_warn(const char *fname,
1196                                   struct md_object *mo_po,
1197                                   struct md_object *mo_pn,
1198                                   const struct lu_fid *lf,
1199                                   const char *s_name,
1200                                   const char *t_name,
1201                                   int err)
1202 {
1203         CWARN("cmr_rename failed for %s, should revoke: "
1204               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1205               "[sname %s] [tname %s] [err %d]\n", fname,
1206               PFID(lu_object_fid(&mo_po->mo_lu)),
1207               PFID(lu_object_fid(&mo_pn->mo_lu)),
1208               PFID(lf), s_name, t_name, err);
1209 }
1210
1211 static int cmr_rename(const struct lu_env *env,
1212                       struct md_object *mo_po, struct md_object *mo_pn,
1213                       const struct lu_fid *lf, const struct lu_name *ls_name,
1214                       struct md_object *mo_t, const struct lu_name *lt_name,
1215                       struct md_attr *ma)
1216 {
1217         struct cmm_thread_info *cmi;
1218         struct md_attr *tmp_ma;
1219         int rc;
1220         ENTRY;
1221
1222         LASSERT(mo_t == NULL);
1223
1224         /* get real type of src */
1225         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1226         if (rc)
1227                 RETURN(rc);
1228
1229         /* Local permission check for name_remove before remote ops. */
1230         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1231                            MAY_UNLINK | MAY_VTX_FULL);
1232         if (rc)
1233                 RETURN(rc);
1234
1235         /*
1236          * XXX: @ma maybe changed after mdo_rename_tgt, but we will use it
1237          * for mdo_name_remove later, so save it before mdo_rename_tgt.
1238          */
1239         cmi = cmm_env_info(env);
1240         tmp_ma = &cmi->cmi_ma;
1241         *tmp_ma = *ma;
1242         /* the mo_pn is remote directory, so we cannot even know if there is
1243          * mo_t or not. Therefore mo_t is NULL here but remote server should do
1244          * lookup and process this further */
1245         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1246                             NULL/* mo_t */, lf, lt_name, ma);
1247         if (rc)
1248                 RETURN(rc);
1249
1250         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1251
1252         /* src object maybe on remote MDS, do remote ops first. */
1253         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1254         if (unlikely(rc)) {
1255                 /* TODO: revoke mdo_rename_tgt */
1256                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1257                                 ls_name->ln_name, lt_name->ln_name, rc);
1258                 RETURN(rc);
1259         }
1260
1261         /* only old name is removed localy */
1262         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1263         if (unlikely(rc))
1264                 /* TODO: revoke all cmr_rename */
1265                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1266                                 ls_name->ln_name, lt_name->ln_name, rc);
1267
1268         RETURN(rc);
1269 }
1270
1271 /* part of cross-ref rename(). Used to insert new name in new parent
1272  * and unlink target */
1273 static int cmr_rename_tgt(const struct lu_env *env,
1274                           struct md_object *mo_p, struct md_object *mo_t,
1275                           const struct lu_fid *lf, const struct lu_name *lname,
1276                           struct md_attr *ma)
1277 {
1278         struct cmm_thread_info *cmi;
1279         struct md_attr *tmp_ma;
1280         int rc;
1281         ENTRY;
1282
1283         /* target object is remote one */
1284         /* Local permission check for rename_tgt before remote ops. */
1285         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1286                            MAY_UNLINK | MAY_VTX_PART);
1287         if (rc)
1288                 RETURN(rc);
1289
1290         /*
1291          * XXX: @ma maybe changed after mo_ref_del, but we will use
1292          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1293          */
1294         cmi = cmm_env_info(env);
1295         tmp_ma = &cmi->cmi_ma;
1296         *tmp_ma = *ma;
1297         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1298         /* continue locally with name handling only */
1299         if (rc == 0) {
1300                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1301                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1302                                     NULL, lf, lname, tmp_ma);
1303                 if (unlikely(rc)) {
1304                         /* TODO: ref_add to mo_t on remote MDS */
1305                         CWARN("cmr_rename_tgt failed, should revoke: "
1306                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1307                               "[name %s] [err %d]\n",
1308                               PFID(lu_object_fid(&mo_p->mo_lu)),
1309                               PFID(lu_object_fid(&mo_t->mo_lu)),
1310                               PFID(lf),
1311                               lname->ln_name, rc);
1312                 }
1313         }
1314         RETURN(rc);
1315 }
1316
1317 static const struct md_dir_operations cmr_dir_ops = {
1318         .mdo_is_subdir   = cmm_is_subdir,
1319         .mdo_lookup      = cmr_lookup,
1320         .mdo_lock_mode   = cmr_lock_mode,
1321         .mdo_create      = cmr_create,
1322         .mdo_link        = cmr_link,
1323         .mdo_unlink      = cmr_unlink,
1324         .mdo_rename      = cmr_rename,
1325         .mdo_rename_tgt  = cmr_rename_tgt
1326 };