Whamcloud - gitweb
86741be6cae75ecb985cc14ce05c1486abdc2635
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <lustre_fid.h>
50 #include "cmm_internal.h"
51 #include "mdc_internal.h"
52 /**
53  * \ingroup cmm
54  * Lookup MDS number \a mds by FID \a fid.
55  *
56  * \param fid FID of object to find MDS
57  * \param mds mds number to return.
58  */
59 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
60                    mdsno_t *mds, const struct lu_env *env)
61 {
62         int rc = 0;
63         ENTRY;
64
65         LASSERT(fid_is_sane(fid));
66
67         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
68                                LU_SEQ_RANGE_MDT, env);
69         if (rc) {
70                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
71                        fid_seq(fid), rc);
72                 RETURN(rc);
73         }
74
75         if (*mds > cm->cmm_tgt_count) {
76                 CERROR("Got invalid mdsno: %x (max: %x)\n",
77                        *mds, cm->cmm_tgt_count);
78                 rc = -EINVAL;
79         } else {
80                 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
81                        LPX64"\n", *mds, fid_seq(fid));
82         }
83
84         RETURN (rc);
85 }
86
87 /**
88  * \addtogroup cml
89  * @{
90  */
91 static const struct md_object_operations cml_mo_ops;
92 static const struct md_dir_operations    cml_dir_ops;
93 static const struct lu_object_operations cml_obj_ops;
94
95 static const struct md_object_operations cmr_mo_ops;
96 static const struct md_dir_operations    cmr_dir_ops;
97 static const struct lu_object_operations cmr_obj_ops;
98
99 /**
100  * \ingroup cmm
101  * Allocate CMM object.
102  */
103 struct lu_object *cmm_object_alloc(const struct lu_env *env,
104                                    const struct lu_object_header *loh,
105                                    struct lu_device *ld)
106 {
107         const struct lu_fid *fid = &loh->loh_fid;
108         struct lu_object  *lo = NULL;
109         struct cmm_device *cd;
110         mdsno_t mds;
111         int rc = 0;
112
113         ENTRY;
114
115         cd = lu2cmm_dev(ld);
116         if (cd->cmm_flags & CMM_INITIALIZED) {
117                 /* get object location */
118                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
119                 if (rc)
120                         RETURN(NULL);
121         } else
122                 /*
123                  * Device is not yet initialized, cmm_object is being created
124                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
125                  * etc.). Such object *has* to be local.
126                  */
127                 mds = cd->cmm_local_num;
128
129         /* select the proper set of operations based on object location */
130         if (mds == cd->cmm_local_num) {
131                 struct cml_object *clo;
132
133                 OBD_ALLOC_PTR(clo);
134                 if (clo != NULL) {
135                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
136                         lu_object_init(lo, NULL, ld);
137                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
138                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
139                         lo->lo_ops = &cml_obj_ops;
140                 }
141         } else {
142                 struct cmr_object *cro;
143
144                 OBD_ALLOC_PTR(cro);
145                 if (cro != NULL) {
146                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
147                         lu_object_init(lo, NULL, ld);
148                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
149                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
150                         lo->lo_ops = &cmr_obj_ops;
151                         cro->cmo_num = mds;
152                 }
153         }
154         RETURN(lo);
155 }
156
157 /**
158  * Get local child device.
159  */
160 static struct lu_device *cml_child_dev(struct cmm_device *d)
161 {
162         return &d->cmm_child->md_lu_dev;
163 }
164
165 /**
166  * Free cml_object.
167  */
168 static void cml_object_free(const struct lu_env *env,
169                             struct lu_object *lo)
170 {
171         struct cml_object *clo = lu2cml_obj(lo);
172         lu_object_fini(lo);
173         OBD_FREE_PTR(clo);
174 }
175
176 /**
177  * Initialize cml_object.
178  */
179 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
180                            const struct lu_object_conf *unused)
181 {
182         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
183         struct lu_device  *c_dev;
184         struct lu_object  *c_obj;
185         int rc;
186
187         ENTRY;
188
189 #ifdef HAVE_SPLIT_SUPPORT
190         if (cd->cmm_tgt_count == 0)
191                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
192         else
193                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
194 #endif
195         c_dev = cml_child_dev(cd);
196         if (c_dev == NULL) {
197                 rc = -ENOENT;
198         } else {
199                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
200                                                         lo->lo_header, c_dev);
201                 if (c_obj != NULL) {
202                         lu_object_add(lo, c_obj);
203                         rc = 0;
204                 } else {
205                         rc = -ENOMEM;
206                 }
207         }
208
209         RETURN(rc);
210 }
211
212 static int cml_object_print(const struct lu_env *env, void *cookie,
213                             lu_printer_t p, const struct lu_object *lo)
214 {
215         return (*p)(env, cookie, "[local]");
216 }
217
218 static const struct lu_object_operations cml_obj_ops = {
219         .loo_object_init    = cml_object_init,
220         .loo_object_free    = cml_object_free,
221         .loo_object_print   = cml_object_print
222 };
223
224 /**
225  * \name CMM local md_object operations.
226  * All of them call just corresponding operations on next layer.
227  * @{
228  */
229 static int cml_object_create(const struct lu_env *env,
230                              struct md_object *mo,
231                              const struct md_op_spec *spec,
232                              struct md_attr *attr)
233 {
234         int rc;
235         ENTRY;
236         rc = mo_object_create(env, md_object_next(mo), spec, attr);
237         RETURN(rc);
238 }
239
240 static int cml_permission(const struct lu_env *env,
241                           struct md_object *p, struct md_object *c,
242                           struct md_attr *attr, int mask)
243 {
244         int rc;
245         ENTRY;
246         rc = mo_permission(env, md_object_next(p), md_object_next(c),
247                            attr, mask);
248         RETURN(rc);
249 }
250
251 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
252                         struct md_attr *attr)
253 {
254         int rc;
255         ENTRY;
256         rc = mo_attr_get(env, md_object_next(mo), attr);
257         RETURN(rc);
258 }
259
260 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
261                         const struct md_attr *attr)
262 {
263         int rc;
264         ENTRY;
265         rc = mo_attr_set(env, md_object_next(mo), attr);
266         RETURN(rc);
267 }
268
269 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
270                          struct lu_buf *buf, const char *name)
271 {
272         int rc;
273         ENTRY;
274         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
275         RETURN(rc);
276 }
277
278 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
279                         struct lu_buf *buf)
280 {
281         int rc;
282         ENTRY;
283         rc = mo_readlink(env, md_object_next(mo), buf);
284         RETURN(rc);
285 }
286
287 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
288                           struct lu_buf *buf)
289 {
290         int rc;
291         ENTRY;
292         rc = mo_xattr_list(env, md_object_next(mo), buf);
293         RETURN(rc);
294 }
295
296 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
297                          const struct lu_buf *buf, const char *name,
298                          int fl)
299 {
300         int rc;
301         ENTRY;
302         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
303         RETURN(rc);
304 }
305
306 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
307                          const char *name)
308 {
309         int rc;
310         ENTRY;
311         rc = mo_xattr_del(env, md_object_next(mo), name);
312         RETURN(rc);
313 }
314
315 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
316                        const struct md_attr *ma)
317 {
318         int rc;
319         ENTRY;
320         rc = mo_ref_add(env, md_object_next(mo), ma);
321         RETURN(rc);
322 }
323
324 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
325                        struct md_attr *ma)
326 {
327         int rc;
328         ENTRY;
329         rc = mo_ref_del(env, md_object_next(mo), ma);
330         RETURN(rc);
331 }
332
333 static int cml_open(const struct lu_env *env, struct md_object *mo,
334                     int flags)
335 {
336         int rc;
337         ENTRY;
338         rc = mo_open(env, md_object_next(mo), flags);
339         RETURN(rc);
340 }
341
342 static int cml_close(const struct lu_env *env, struct md_object *mo,
343                      struct md_attr *ma)
344 {
345         int rc;
346         ENTRY;
347         rc = mo_close(env, md_object_next(mo), ma);
348         RETURN(rc);
349 }
350
351 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
352                         const struct lu_rdpg *rdpg)
353 {
354         int rc;
355         ENTRY;
356         rc = mo_readpage(env, md_object_next(mo), rdpg);
357         RETURN(rc);
358 }
359
360 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
361                         struct lustre_capa *capa, int renewal)
362 {
363         int rc;
364         ENTRY;
365         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
366         RETURN(rc);
367 }
368
369 static int cml_path(const struct lu_env *env, struct md_object *mo,
370                     char *path, int pathlen, __u64 *recno, int *linkno)
371 {
372         int rc;
373         ENTRY;
374         rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
375         RETURN(rc);
376 }
377
378 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
379 {
380         int rc;
381         ENTRY;
382         rc = mo_object_sync(env, md_object_next(mo));
383         RETURN(rc);
384 }
385
386 static dt_obj_version_t cml_version_get(const struct lu_env *env,
387                                         struct md_object *mo)
388 {
389         return mo_version_get(env, md_object_next(mo));
390 }
391
392 static void cml_version_set(const struct lu_env *env, struct md_object *mo,
393                             dt_obj_version_t version)
394 {
395         return mo_version_set(env, md_object_next(mo), version);
396 }
397
398 static const struct md_object_operations cml_mo_ops = {
399         .moo_permission    = cml_permission,
400         .moo_attr_get      = cml_attr_get,
401         .moo_attr_set      = cml_attr_set,
402         .moo_xattr_get     = cml_xattr_get,
403         .moo_xattr_list    = cml_xattr_list,
404         .moo_xattr_set     = cml_xattr_set,
405         .moo_xattr_del     = cml_xattr_del,
406         .moo_object_create = cml_object_create,
407         .moo_ref_add       = cml_ref_add,
408         .moo_ref_del       = cml_ref_del,
409         .moo_open          = cml_open,
410         .moo_close         = cml_close,
411         .moo_readpage      = cml_readpage,
412         .moo_readlink      = cml_readlink,
413         .moo_capa_get      = cml_capa_get,
414         .moo_object_sync   = cml_object_sync,
415         .moo_version_get   = cml_version_get,
416         .moo_version_set   = cml_version_set,
417         .moo_path          = cml_path,
418 };
419 /** @} */
420
421 /**
422  * \name CMM local md_dir_operations.
423  * @{
424  */
425 /**
426  * cml lookup object fid by name.
427  * This returns only FID by name.
428  */
429 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
430                       const struct lu_name *lname, struct lu_fid *lf,
431                       struct md_op_spec *spec)
432 {
433         int rc;
434         ENTRY;
435
436 #ifdef HAVE_SPLIT_SUPPORT
437         if (spec != NULL && spec->sp_ck_split) {
438                 rc = cmm_split_check(env, mo_p, lname->ln_name);
439                 if (rc)
440                         RETURN(rc);
441         }
442 #endif
443         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
444         RETURN(rc);
445
446 }
447
448 /**
449  * Helper to return lock mode. Used in split cases only.
450  */
451 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
452                                 struct md_object *mo, mdl_mode_t lm)
453 {
454         int rc = MDL_MINMODE;
455         ENTRY;
456
457 #ifdef HAVE_SPLIT_SUPPORT
458         rc = cmm_split_access(env, mo, lm);
459 #endif
460
461         RETURN(rc);
462 }
463
464 /**
465  * Create operation for cml.
466  * Objects are local, but split can happen.
467  * If split is not needed this will call next layer mdo_create().
468  *
469  * \param mo_p Parent directory. Local object.
470  * \param lname name of file to create.
471  * \param mo_c Child object. It has no real inode yet.
472  * \param spec creation specification.
473  * \param ma child object attributes.
474  */
475 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
476                       const struct lu_name *lname, struct md_object *mo_c,
477                       struct md_op_spec *spec, struct md_attr *ma)
478 {
479         int rc;
480         ENTRY;
481
482 #ifdef HAVE_SPLIT_SUPPORT
483         /* Lock mode always should be sane. */
484         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
485
486         /*
487          * Sigh... This is long story. MDT may have race with detecting if split
488          * is possible in cmm. We know this race and let it live, because
489          * getting it rid (with some sem or spinlock) will also mean that
490          * PDIROPS for create will not work because we kill parallel work, what
491          * is really bad for performance and makes no sense having PDIROPS. So,
492          * we better allow the race to live, but split dir only if some of
493          * concurrent threads takes EX lock, not matter which one. So that, say,
494          * two concurrent threads may have different lock modes on directory (CW
495          * and EX) and not first one which comes here and see that split is
496          * possible should split the dir, but only that one which has EX
497          * lock. And we do not care that in this case, split may happen a bit
498          * later (when dir size will not be necessarily 64K, but may be a bit
499          * larger). So that, we allow concurrent creates and protect split by EX
500          * lock.
501          */
502         if (spec->sp_cr_mode == MDL_EX) {
503                 /**
504                  * Split cases:
505                  * - Try to split \a mo_p upon each create operation.
506                  *   If split is ok, -ERESTART is returned and current thread
507                  *   will not peoceed with create. Instead it sends -ERESTART
508                  *   to client to let it know that correct MDT must be chosen.
509                  * \see cmm_split_dir()
510                  */
511                 rc = cmm_split_dir(env, mo_p);
512                 if (rc)
513                         /*
514                          * -ERESTART or some split error is returned, we can't
515                          * proceed with create.
516                          */
517                         GOTO(out, rc);
518         }
519
520         if (spec != NULL && spec->sp_ck_split) {
521                 /**
522                  * - Directory is split already. Let the caller know that
523                  * it should tell client that directory is split and operation
524                  * should repeat to correct MDT.
525                  * \see cmm_split_check()
526                  */
527                 rc = cmm_split_check(env, mo_p, lname->ln_name);
528                 if (rc)
529                         GOTO(out, rc);
530         }
531 #endif
532
533         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
534                         spec, ma);
535
536         EXIT;
537 #ifdef HAVE_SPLIT_SUPPORT
538 out:
539 #endif
540         return rc;
541 }
542
543 /** Call mdo_create_data() on next layer. All objects are local. */
544 static int cml_create_data(const struct lu_env *env, struct md_object *p,
545                            struct md_object *o,
546                            const struct md_op_spec *spec,
547                            struct md_attr *ma)
548 {
549         int rc;
550         ENTRY;
551         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
552                              spec, ma);
553         RETURN(rc);
554 }
555
556 /** Call mdo_link() on next layer. All objects are local. */
557 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
558                     struct md_object *mo_s, const struct lu_name *lname,
559                     struct md_attr *ma)
560 {
561         int rc;
562         ENTRY;
563         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
564                       lname, ma);
565         RETURN(rc);
566 }
567
568 /** Call mdo_unlink() on next layer. All objects are local. */
569 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
570                       struct md_object *mo_c, const struct lu_name *lname,
571                       struct md_attr *ma)
572 {
573         int rc;
574         ENTRY;
575         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
576                         lname, ma);
577         RETURN(rc);
578 }
579
580 /**
581  * \ingroup cmm
582  * Get mode of object.
583  * Used in both cml and cmr hence can produce RPC to another server.
584  */
585 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
586                         const struct lu_fid *lf, struct md_attr *ma,
587                         int *remote)
588 {
589         struct md_object *mo_s = md_object_find_slice(env, md, lf);
590         struct cmm_thread_info *cmi;
591         struct md_attr *tmp_ma;
592         int rc;
593         ENTRY;
594
595         if (IS_ERR(mo_s))
596                 RETURN(PTR_ERR(mo_s));
597
598         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
599                 *remote = 1;
600
601         cmi = cmm_env_info(env);
602         tmp_ma = &cmi->cmi_ma;
603         tmp_ma->ma_need = MA_INODE;
604         tmp_ma->ma_valid = 0;
605         /* get type from src, can be remote req */
606         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
607         if (rc == 0) {
608                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
609                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
610                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
611                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
612                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
613         }
614         lu_object_put(env, &mo_s->mo_lu);
615         RETURN(rc);
616 }
617
618 /**
619  * \ingroup cmm
620  * Set ctime for object.
621  * Used in both cml and cmr hence can produce RPC to another server.
622  */
623 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
624                             const struct lu_fid *lf, struct md_attr *ma)
625 {
626         struct md_object *mo_s = md_object_find_slice(env, md, lf);
627         int rc;
628         ENTRY;
629
630         if (IS_ERR(mo_s))
631                 RETURN(PTR_ERR(mo_s));
632
633         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
634         /* set ctime to obj, can be remote req */
635         rc = mo_attr_set(env, md_object_next(mo_s), ma);
636         lu_object_put(env, &mo_s->mo_lu);
637         RETURN(rc);
638 }
639
640 /** Helper to output debug information about rename operation. */
641 static inline void cml_rename_warn(const char *fname,
642                                   struct md_object *mo_po,
643                                   struct md_object *mo_pn,
644                                   const struct lu_fid *lf,
645                                   const char *s_name,
646                                   struct md_object *mo_t,
647                                   const char *t_name,
648                                   int err)
649 {
650         if (mo_t)
651                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
652                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
653                       "[tname %s] [err %d]\n", fname,
654                       PFID(lu_object_fid(&mo_po->mo_lu)),
655                       PFID(lu_object_fid(&mo_pn->mo_lu)),
656                       PFID(lf), s_name,
657                       PFID(lu_object_fid(&mo_t->mo_lu)),
658                       t_name, err);
659         else
660                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
661                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
662                       "[tname %s] [err %d]\n", fname,
663                       PFID(lu_object_fid(&mo_po->mo_lu)),
664                       PFID(lu_object_fid(&mo_pn->mo_lu)),
665                       PFID(lf), s_name,
666                       t_name, err);
667 }
668
669 /**
670  * Rename operation for cml.
671  *
672  * This is the most complex cross-reference operation. It may consist of up to 4
673  * MDS server and require several RPCs to be sent.
674  *
675  * \param mo_po Old parent object.
676  * \param mo_pn New parent object.
677  * \param lf FID of object to rename.
678  * \param ls_name Source file name.
679  * \param mo_t target object. Should be NULL here.
680  * \param lt_name Name of target file.
681  * \param ma object attributes.
682  */
683 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
684                       struct md_object *mo_pn, const struct lu_fid *lf,
685                       const struct lu_name *ls_name, struct md_object *mo_t,
686                       const struct lu_name *lt_name, struct md_attr *ma)
687 {
688         struct cmm_thread_info *cmi;
689         struct md_attr *tmp_ma = NULL;
690         struct md_object *tmp_t = mo_t;
691         int remote = 0, rc;
692         ENTRY;
693
694         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
695         if (rc)
696                 RETURN(rc);
697
698         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
699                 /**
700                  * \note \a mo_t is remote object and there is RPC to unlink it.
701                  * Before that, do local sanity check for rename first.
702                  */
703                 if (!remote) {
704                         struct md_object *mo_s = md_object_find_slice(env,
705                                                         md_obj2dev(mo_po), lf);
706                         if (IS_ERR(mo_s))
707                                 RETURN(PTR_ERR(mo_s));
708
709                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
710                         rc = mo_permission(env, md_object_next(mo_po),
711                                            md_object_next(mo_s),
712                                            ma, MAY_RENAME_SRC);
713                         lu_object_put(env, &mo_s->mo_lu);
714                         if (rc)
715                                 RETURN(rc);
716                 } else {
717                         rc = mo_permission(env, NULL, md_object_next(mo_po),
718                                            ma, MAY_UNLINK | MAY_VTX_FULL);
719                         if (rc)
720                                 RETURN(rc);
721                 }
722
723                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
724                                    MAY_UNLINK | MAY_VTX_PART);
725                 if (rc)
726                         RETURN(rc);
727
728                 /*
729                  * /note \a ma will be changed after mo_ref_del(), but we will use
730                  * it for mdo_rename() later, so save it before mo_ref_del().
731                  */
732                 cmi = cmm_env_info(env);
733                 tmp_ma = &cmi->cmi_ma;
734                 *tmp_ma = *ma;
735                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
736                 if (rc)
737                         RETURN(rc);
738
739                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
740                 mo_t = NULL;
741         }
742
743         /**
744          * \note for src on remote MDS case, change its ctime before local
745          * rename. Firstly, do local sanity check for rename if necessary.
746          */
747         if (remote) {
748                 if (!tmp_ma) {
749                         rc = mo_permission(env, NULL, md_object_next(mo_po),
750                                            ma, MAY_UNLINK | MAY_VTX_FULL);
751                         if (rc)
752                                 RETURN(rc);
753
754                         if (mo_t) {
755                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
756                                 rc = mo_permission(env, md_object_next(mo_pn),
757                                                    md_object_next(mo_t),
758                                                    ma, MAY_RENAME_TAR);
759                                 if (rc)
760                                         RETURN(rc);
761                         } else {
762                                 int mask;
763
764                                 if (mo_po != mo_pn)
765                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
766                                                 MAY_LINK : MAY_CREATE);
767                                 else
768                                         mask = MAY_CREATE;
769                                 rc = mo_permission(env, NULL,
770                                                    md_object_next(mo_pn),
771                                                    NULL, mask);
772                                 if (rc)
773                                         RETURN(rc);
774                         }
775
776                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
777                 } else {
778                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
779                 }
780
781                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
782                                       tmp_ma ? tmp_ma : ma);
783                 if (rc) {
784                         /* TODO: revoke mo_t if necessary. */
785                         cml_rename_warn("cmm_rename_ctime", mo_po,
786                                         mo_pn, lf, ls_name->ln_name,
787                                         tmp_t, lt_name->ln_name, rc);
788                         RETURN(rc);
789                 }
790         }
791
792         /* local rename, mo_t can be NULL */
793         rc = mdo_rename(env, md_object_next(mo_po),
794                         md_object_next(mo_pn), lf, ls_name,
795                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
796         if (rc)
797                 /* TODO: revoke all cml_rename */
798                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
799                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
800
801         RETURN(rc);
802 }
803
804 /**
805  * Rename target partial operation.
806  * Used for cross-ref rename.
807  */
808 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
809                           struct md_object *mo_t, const struct lu_fid *lf,
810                           const struct lu_name *lname, struct md_attr *ma)
811 {
812         int rc;
813         ENTRY;
814
815         rc = mdo_rename_tgt(env, md_object_next(mo_p),
816                             md_object_next(mo_t), lf, lname, ma);
817         RETURN(rc);
818 }
819
820 /**
821  * Name insert only operation.
822  * used only in case of rename_tgt() when target doesn't exist.
823  */
824 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
825                            const struct lu_name *lname, const struct lu_fid *lf,
826                            const struct md_attr *ma)
827 {
828         int rc;
829         ENTRY;
830
831         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
832
833         RETURN(rc);
834 }
835
836 /**
837  * \ingroup cmm
838  * Check two fids are not subdirectories.
839  */
840 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
841                          const struct lu_fid *fid, struct lu_fid *sfid)
842 {
843         struct cmm_thread_info *cmi;
844         int rc;
845         ENTRY;
846
847         cmi = cmm_env_info(env);
848         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
849         if (rc)
850                 RETURN(rc);
851
852         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
853                 RETURN(0);
854
855         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
856         RETURN(rc);
857 }
858
859 static const struct md_dir_operations cml_dir_ops = {
860         .mdo_is_subdir   = cmm_is_subdir,
861         .mdo_lookup      = cml_lookup,
862         .mdo_lock_mode   = cml_lock_mode,
863         .mdo_create      = cml_create,
864         .mdo_link        = cml_link,
865         .mdo_unlink      = cml_unlink,
866         .mdo_name_insert = cml_name_insert,
867         .mdo_rename      = cml_rename,
868         .mdo_rename_tgt  = cml_rename_tgt,
869         .mdo_create_data = cml_create_data
870 };
871 /** @} */
872 /** @} */
873
874 /**
875  * \addtogroup cmr
876  * @{
877  */
878 /**
879  * \name cmr helpers
880  * @{
881  */
882 /** Get cmr_object from lu_object. */
883 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
884 {
885         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
886 }
887 /** Get cmr_object from md_object. */
888 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
889 {
890         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
891 }
892 /** Get cmr_object from cmm_object. */
893 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
894 {
895         return container_of0(co, struct cmr_object, cmm_obj);
896 }
897 /** @} */
898
899 /**
900  * Get proper child device from MDCs.
901  */
902 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
903 {
904         struct lu_device *next = NULL;
905         struct mdc_device *mdc;
906
907         cfs_spin_lock(&d->cmm_tgt_guard);
908         cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
909                 if (mdc->mc_num == num) {
910                         next = mdc2lu_dev(mdc);
911                         break;
912                 }
913         }
914         cfs_spin_unlock(&d->cmm_tgt_guard);
915         return next;
916 }
917
918 /**
919  * Free cmr_object.
920  */
921 static void cmr_object_free(const struct lu_env *env,
922                             struct lu_object *lo)
923 {
924         struct cmr_object *cro = lu2cmr_obj(lo);
925         lu_object_fini(lo);
926         OBD_FREE_PTR(cro);
927 }
928
929 /**
930  * Initialize cmr object.
931  */
932 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
933                            const struct lu_object_conf *unused)
934 {
935         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
936         struct lu_device  *c_dev;
937         struct lu_object  *c_obj;
938         int rc;
939
940         ENTRY;
941
942         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
943         if (c_dev == NULL) {
944                 rc = -ENOENT;
945         } else {
946                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
947                                                         lo->lo_header, c_dev);
948                 if (c_obj != NULL) {
949                         lu_object_add(lo, c_obj);
950                         rc = 0;
951                 } else {
952                         rc = -ENOMEM;
953                 }
954         }
955
956         RETURN(rc);
957 }
958
959 /**
960  * Output lu_object data.
961  */
962 static int cmr_object_print(const struct lu_env *env, void *cookie,
963                             lu_printer_t p, const struct lu_object *lo)
964 {
965         const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
966         return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
967 }
968
969 /**
970  * Cmr instance of lu_object_operations.
971  */
972 static const struct lu_object_operations cmr_obj_ops = {
973         .loo_object_init    = cmr_object_init,
974         .loo_object_free    = cmr_object_free,
975         .loo_object_print   = cmr_object_print
976 };
977
978 /**
979  * \name cmr remote md_object operations.
980  * All operations here are invalid and return errors. There is no local object
981  * so these operations return two kinds of error:
982  * -# -EFAULT if operation is prohibited.
983  * -# -EREMOTE if operation can be done just to notify upper level about remote
984  *  object.
985  *
986  * @{
987  */
988 static int cmr_object_create(const struct lu_env *env,
989                              struct md_object *mo,
990                              const struct md_op_spec *spec,
991                              struct md_attr *ma)
992 {
993         return -EFAULT;
994 }
995
996 static int cmr_permission(const struct lu_env *env,
997                           struct md_object *p, struct md_object *c,
998                           struct md_attr *attr, int mask)
999 {
1000         return -EREMOTE;
1001 }
1002
1003 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
1004                         struct md_attr *attr)
1005 {
1006         return -EREMOTE;
1007 }
1008
1009 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
1010                         const struct md_attr *attr)
1011 {
1012         return -EFAULT;
1013 }
1014
1015 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
1016                          struct lu_buf *buf, const char *name)
1017 {
1018         return -EFAULT;
1019 }
1020
1021 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
1022                         struct lu_buf *buf)
1023 {
1024         return -EFAULT;
1025 }
1026
1027 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
1028                           struct lu_buf *buf)
1029 {
1030         return -EFAULT;
1031 }
1032
1033 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
1034                          const struct lu_buf *buf, const char *name,
1035                          int fl)
1036 {
1037         return -EFAULT;
1038 }
1039
1040 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
1041                          const char *name)
1042 {
1043         return -EFAULT;
1044 }
1045
1046 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
1047                        const struct md_attr *ma)
1048 {
1049         return -EFAULT;
1050 }
1051
1052 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
1053                        struct md_attr *ma)
1054 {
1055         return -EFAULT;
1056 }
1057
1058 static int cmr_open(const struct lu_env *env, struct md_object *mo,
1059                     int flags)
1060 {
1061         return -EREMOTE;
1062 }
1063
1064 static int cmr_close(const struct lu_env *env, struct md_object *mo,
1065                      struct md_attr *ma)
1066 {
1067         return -EFAULT;
1068 }
1069
1070 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
1071                         const struct lu_rdpg *rdpg)
1072 {
1073         return -EREMOTE;
1074 }
1075
1076 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
1077                         struct lustre_capa *capa, int renewal)
1078 {
1079         return -EFAULT;
1080 }
1081
1082 static int cmr_path(const struct lu_env *env, struct md_object *obj,
1083                     char *path, int pathlen, __u64 *recno, int *linkno)
1084 {
1085         return -EREMOTE;
1086 }
1087
1088 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
1089 {
1090         return -EFAULT;
1091 }
1092
1093 /**
1094  * cmr moo_version_get().
1095  */
1096 static dt_obj_version_t cmr_version_get(const struct lu_env *env,
1097                                         struct md_object *mo)
1098 {
1099         /** Don't check remote object version */
1100         return 0;
1101 }
1102
1103
1104 /**
1105  * cmr moo_version_set().
1106  * No need to update remote object version here, it is done as a part
1107  * of reintegration of partial operation on the remote server.
1108  */
1109 static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
1110                             dt_obj_version_t version)
1111 {
1112         return;
1113 }
1114
1115 /** Set of md_object_operations for cmr. */
1116 static const struct md_object_operations cmr_mo_ops = {
1117         .moo_permission    = cmr_permission,
1118         .moo_attr_get      = cmr_attr_get,
1119         .moo_attr_set      = cmr_attr_set,
1120         .moo_xattr_get     = cmr_xattr_get,
1121         .moo_xattr_set     = cmr_xattr_set,
1122         .moo_xattr_list    = cmr_xattr_list,
1123         .moo_xattr_del     = cmr_xattr_del,
1124         .moo_object_create = cmr_object_create,
1125         .moo_ref_add       = cmr_ref_add,
1126         .moo_ref_del       = cmr_ref_del,
1127         .moo_open          = cmr_open,
1128         .moo_close         = cmr_close,
1129         .moo_readpage      = cmr_readpage,
1130         .moo_readlink      = cmr_readlink,
1131         .moo_capa_get      = cmr_capa_get,
1132         .moo_object_sync   = cmr_object_sync,
1133         .moo_version_get   = cmr_version_get,
1134         .moo_version_set   = cmr_version_set,
1135         .moo_path          = cmr_path,
1136 };
1137 /** @} */
1138
1139 /**
1140  * \name cmr md_dir operations.
1141  *
1142  * All methods below are cross-ref by nature. They consist of remote call and
1143  * local operation. Due to future rollback functionality there are several
1144  * limitations for such methods:
1145  * -# remote call should be done at first to do epoch negotiation between all
1146  * MDS involved and to avoid the RPC inside transaction.
1147  * -# only one RPC can be sent - also due to epoch negotiation.
1148  * For more details see rollback HLD/DLD.
1149  * @{
1150  */
1151 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1152                       const struct lu_name *lname, struct lu_fid *lf,
1153                       struct md_op_spec *spec)
1154 {
1155         /*
1156          * This can happens while rename() If new parent is remote dir, lookup
1157          * will happen here.
1158          */
1159
1160         return -EREMOTE;
1161 }
1162
1163 /** Return lock mode. */
1164 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1165                                 struct md_object *mo, mdl_mode_t lm)
1166 {
1167         return MDL_MINMODE;
1168 }
1169
1170 /**
1171  * Create operation for cmr.
1172  * Remote object creation and local name insert.
1173  *
1174  * \param mo_p Parent directory. Local object.
1175  * \param lchild_name name of file to create.
1176  * \param mo_c Child object. It has no real inode yet.
1177  * \param spec creation specification.
1178  * \param ma child object attributes.
1179  */
1180 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1181                       const struct lu_name *lchild_name, struct md_object *mo_c,
1182                       struct md_op_spec *spec,
1183                       struct md_attr *ma)
1184 {
1185         struct cmm_thread_info *cmi;
1186         struct md_attr *tmp_ma;
1187         int rc;
1188         ENTRY;
1189
1190         /* Make sure that name isn't exist before doing remote call. */
1191         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1192                         &cmm_env_info(env)->cmi_fid, NULL);
1193         if (rc == 0)
1194                 RETURN(-EEXIST);
1195         else if (rc != -ENOENT)
1196                 RETURN(rc);
1197
1198         /* check the SGID attr */
1199         cmi = cmm_env_info(env);
1200         LASSERT(cmi);
1201         tmp_ma = &cmi->cmi_ma;
1202         tmp_ma->ma_valid = 0;
1203         tmp_ma->ma_need = MA_INODE;
1204
1205 #ifdef CONFIG_FS_POSIX_ACL
1206         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1207                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1208                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1209                 tmp_ma->ma_need |= MA_ACL_DEF;
1210         }
1211 #endif
1212         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1213         if (rc)
1214                 RETURN(rc);
1215
1216         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1217                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1218                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1219                         ma->ma_attr.la_mode |= S_ISGID;
1220                         ma->ma_attr.la_valid |= LA_MODE;
1221                 }
1222         }
1223
1224 #ifdef CONFIG_FS_POSIX_ACL
1225         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1226                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1227                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1228                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1229                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1230         }
1231 #endif
1232
1233         /* Local permission check for name_insert before remote ops. */
1234         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1235                            (S_ISDIR(ma->ma_attr.la_mode) ?
1236                            MAY_LINK : MAY_CREATE));
1237         if (rc)
1238                 RETURN(rc);
1239
1240         /**
1241          * \note \a ma will be changed after mo_object_create(), but we will use
1242          * it for mdo_name_insert() later, so save it before mo_object_create().
1243          */
1244         *tmp_ma = *ma;
1245         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1246         if (rc == 0) {
1247                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1248                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1249                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1250                 if (unlikely(rc)) {
1251                         /* TODO: remove object mo_c on remote MDS */
1252                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1253                               " [name %s] [mo_c "DFID"] [err %d]\n",
1254                               PFID(lu_object_fid(&mo_p->mo_lu)),
1255                               lchild_name->ln_name,
1256                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1257                 }
1258         }
1259
1260         RETURN(rc);
1261 }
1262
1263 /**
1264  * Link operations for cmr.
1265  *
1266  * The link RPC is always issued to the server where source parent is living.
1267  * The first operation to do is object nlink increment on remote server.
1268  * Second one is local mdo_name_insert().
1269  *
1270  * \param mo_p parent directory. It is local.
1271  * \param mo_s source object to link. It is remote.
1272  * \param lname Name of link file.
1273  * \param ma object attributes.
1274  */
1275 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1276                     struct md_object *mo_s, const struct lu_name *lname,
1277                     struct md_attr *ma)
1278 {
1279         int rc;
1280         ENTRY;
1281
1282         /* Make sure that name isn't exist before doing remote call. */
1283         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1284                         &cmm_env_info(env)->cmi_fid, NULL);
1285         if (rc == 0) {
1286                 rc = -EEXIST;
1287         } else if (rc == -ENOENT) {
1288                 /* Local permission check for name_insert before remote ops. */
1289                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1290                                    MAY_CREATE);
1291                 if (rc)
1292                         RETURN(rc);
1293
1294                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1295                 if (rc == 0) {
1296                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1297                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1298                                              lu_object_fid(&mo_s->mo_lu), ma);
1299                         if (unlikely(rc)) {
1300                                 /* TODO: ref_del from mo_s on remote MDS */
1301                                 CWARN("cmr_link failed, should revoke: "
1302                                       "[mo_p "DFID"] [mo_s "DFID"] "
1303                                       "[name %s] [err %d]\n",
1304                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1305                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1306                                       lname->ln_name, rc);
1307                         }
1308                 }
1309         }
1310         RETURN(rc);
1311 }
1312
1313 /**
1314  * Unlink operations for cmr.
1315  *
1316  * The unlink RPC is always issued to the server where parent is living. Hence
1317  * the first operation to do is object unlink on remote server. Second one is
1318  * local mdo_name_remove().
1319  *
1320  * \param mo_p parent md_object. It is local.
1321  * \param mo_c child object to be unlinked. It is remote.
1322  * \param lname Name of file to unlink.
1323  * \param ma object attributes.
1324  */
1325 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1326                       struct md_object *mo_c, const struct lu_name *lname,
1327                       struct md_attr *ma)
1328 {
1329         struct cmm_thread_info *cmi;
1330         struct md_attr *tmp_ma;
1331         int rc;
1332         ENTRY;
1333
1334         /* Local permission check for name_remove before remote ops. */
1335         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1336                            MAY_UNLINK | MAY_VTX_PART);
1337         if (rc)
1338                 RETURN(rc);
1339
1340         /*
1341          * \note \a ma will be changed after mo_ref_del, but we will use
1342          * it for mdo_name_remove() later, so save it before mo_ref_del().
1343          */
1344         cmi = cmm_env_info(env);
1345         tmp_ma = &cmi->cmi_ma;
1346         *tmp_ma = *ma;
1347         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1348         if (rc == 0) {
1349                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1350                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1351                 if (unlikely(rc)) {
1352                         /* TODO: ref_add to mo_c on remote MDS */
1353                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1354                               " [mo_c "DFID"] [name %s] [err %d]\n",
1355                               PFID(lu_object_fid(&mo_p->mo_lu)),
1356                               PFID(lu_object_fid(&mo_c->mo_lu)),
1357                               lname->ln_name, rc);
1358                 }
1359         }
1360
1361         RETURN(rc);
1362 }
1363
1364 /** Helper which outputs error message during cmr_rename() */
1365 static inline void cmr_rename_warn(const char *fname,
1366                                   struct md_object *mo_po,
1367                                   struct md_object *mo_pn,
1368                                   const struct lu_fid *lf,
1369                                   const char *s_name,
1370                                   const char *t_name,
1371                                   int err)
1372 {
1373         CWARN("cmr_rename failed for %s, should revoke: "
1374               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1375               "[sname %s] [tname %s] [err %d]\n", fname,
1376               PFID(lu_object_fid(&mo_po->mo_lu)),
1377               PFID(lu_object_fid(&mo_pn->mo_lu)),
1378               PFID(lf), s_name, t_name, err);
1379 }
1380
1381 /**
1382  * Rename operation for cmr.
1383  *
1384  * This is the most complex cross-reference operation. It may consist of up to 4
1385  * MDS server and require several RPCs to be sent.
1386  *
1387  * \param mo_po Old parent object.
1388  * \param mo_pn New parent object.
1389  * \param lf FID of object to rename.
1390  * \param ls_name Source file name.
1391  * \param mo_t target object. Should be NULL here.
1392  * \param lt_name Name of target file.
1393  * \param ma object attributes.
1394  */
1395 static int cmr_rename(const struct lu_env *env,
1396                       struct md_object *mo_po, struct md_object *mo_pn,
1397                       const struct lu_fid *lf, const struct lu_name *ls_name,
1398                       struct md_object *mo_t, const struct lu_name *lt_name,
1399                       struct md_attr *ma)
1400 {
1401         struct cmm_thread_info *cmi;
1402         struct md_attr *tmp_ma;
1403         int rc;
1404         ENTRY;
1405
1406         LASSERT(mo_t == NULL);
1407
1408         /* get real type of src */
1409         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1410         if (rc)
1411                 RETURN(rc);
1412
1413         /* Local permission check for name_remove before remote ops. */
1414         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1415                            MAY_UNLINK | MAY_VTX_FULL);
1416         if (rc)
1417                 RETURN(rc);
1418
1419         /**
1420          * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
1421          * for mdo_name_remove() later, so save it before mdo_rename_tgt.
1422          */
1423         cmi = cmm_env_info(env);
1424         tmp_ma = &cmi->cmi_ma;
1425         *tmp_ma = *ma;
1426         /**
1427          * \note The \a mo_pn is remote directory, so we cannot even know if there is
1428          * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
1429          * lookup and process this further.
1430          */
1431         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1432                             NULL/* mo_t */, lf, lt_name, ma);
1433         if (rc)
1434                 RETURN(rc);
1435
1436         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1437
1438         /* src object maybe on remote MDS, do remote ops first. */
1439         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1440         if (unlikely(rc)) {
1441                 /* TODO: revoke mdo_rename_tgt */
1442                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1443                                 ls_name->ln_name, lt_name->ln_name, rc);
1444                 RETURN(rc);
1445         }
1446
1447         /* only old name is removed localy */
1448         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1449         if (unlikely(rc))
1450                 /* TODO: revoke all cmr_rename */
1451                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1452                                 ls_name->ln_name, lt_name->ln_name, rc);
1453
1454         RETURN(rc);
1455 }
1456
1457 /**
1458  * Part of cross-ref rename().
1459  * Used to insert new name in new parent and unlink target.
1460  */
1461 static int cmr_rename_tgt(const struct lu_env *env,
1462                           struct md_object *mo_p, struct md_object *mo_t,
1463                           const struct lu_fid *lf, const struct lu_name *lname,
1464                           struct md_attr *ma)
1465 {
1466         struct cmm_thread_info *cmi;
1467         struct md_attr *tmp_ma;
1468         int rc;
1469         ENTRY;
1470
1471         /* target object is remote one */
1472         /* Local permission check for rename_tgt before remote ops. */
1473         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1474                            MAY_UNLINK | MAY_VTX_PART);
1475         if (rc)
1476                 RETURN(rc);
1477
1478         /*
1479          * XXX: @ma maybe changed after mo_ref_del, but we will use
1480          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1481          */
1482         cmi = cmm_env_info(env);
1483         tmp_ma = &cmi->cmi_ma;
1484         *tmp_ma = *ma;
1485         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1486         /* continue locally with name handling only */
1487         if (rc == 0) {
1488                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1489                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1490                                     NULL, lf, lname, tmp_ma);
1491                 if (unlikely(rc)) {
1492                         /* TODO: ref_add to mo_t on remote MDS */
1493                         CWARN("cmr_rename_tgt failed, should revoke: "
1494                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1495                               "[name %s] [err %d]\n",
1496                               PFID(lu_object_fid(&mo_p->mo_lu)),
1497                               PFID(lu_object_fid(&mo_t->mo_lu)),
1498                               PFID(lf),
1499                               lname->ln_name, rc);
1500                 }
1501         }
1502         RETURN(rc);
1503 }
1504 /** @} */
1505 /**
1506  * The md_dir_operations for cmr.
1507  */
1508 static const struct md_dir_operations cmr_dir_ops = {
1509         .mdo_is_subdir   = cmm_is_subdir,
1510         .mdo_lookup      = cmr_lookup,
1511         .mdo_lock_mode   = cmr_lock_mode,
1512         .mdo_create      = cmr_create,
1513         .mdo_link        = cmr_link,
1514         .mdo_unlink      = cmr_unlink,
1515         .mdo_rename      = cmr_rename,
1516         .mdo_rename_tgt  = cmr_rename_tgt
1517 };
1518 /** @} */