Whamcloud - gitweb
9c115de582f744a8133efdc836620fb82077ce42
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <lustre_fid.h>
50 #include "cmm_internal.h"
51 #include "mdc_internal.h"
52 /**
53  * \ingroup cmm
54  * Lookup MDS number \a mds by FID \a fid.
55  *
56  * \param fid FID of object to find MDS
57  * \param mds mds number to return.
58  */
59 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
60                    mdsno_t *mds, const struct lu_env *env)
61 {
62         int rc = 0;
63         ENTRY;
64
65         LASSERT(fid_is_sane(fid));
66
67         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
68                                LU_SEQ_RANGE_MDT, env);
69         if (rc) {
70                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
71                        fid_seq(fid), rc);
72                 RETURN(rc);
73         }
74
75         if (*mds > cm->cmm_tgt_count) {
76                 CERROR("Got invalid mdsno: %x (max: %x)\n",
77                        *mds, cm->cmm_tgt_count);
78                 rc = -EINVAL;
79         } else {
80                 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
81                        LPX64"\n", *mds, fid_seq(fid));
82         }
83
84         RETURN (rc);
85 }
86
87 /**
88  * \addtogroup cml
89  * @{
90  */
91 static const struct md_object_operations cml_mo_ops;
92 static const struct md_dir_operations    cml_dir_ops;
93 static const struct lu_object_operations cml_obj_ops;
94
95 static const struct md_object_operations cmr_mo_ops;
96 static const struct md_dir_operations    cmr_dir_ops;
97 static const struct lu_object_operations cmr_obj_ops;
98
99 /**
100  * \ingroup cmm
101  * Allocate CMM object.
102  */
103 struct lu_object *cmm_object_alloc(const struct lu_env *env,
104                                    const struct lu_object_header *loh,
105                                    struct lu_device *ld)
106 {
107         const struct lu_fid *fid = &loh->loh_fid;
108         struct lu_object  *lo = NULL;
109         struct cmm_device *cd;
110         mdsno_t mds;
111         int rc = 0;
112
113         ENTRY;
114
115         cd = lu2cmm_dev(ld);
116         if (cd->cmm_flags & CMM_INITIALIZED) {
117                 /* get object location */
118                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
119                 if (rc)
120                         RETURN(NULL);
121         } else
122                 /*
123                  * Device is not yet initialized, cmm_object is being created
124                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
125                  * etc.). Such object *has* to be local.
126                  */
127                 mds = cd->cmm_local_num;
128
129         /* select the proper set of operations based on object location */
130         if (mds == cd->cmm_local_num) {
131                 struct cml_object *clo;
132
133                 OBD_ALLOC_PTR(clo);
134                 if (clo != NULL) {
135                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
136                         lu_object_init(lo, NULL, ld);
137                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
138                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
139                         lo->lo_ops = &cml_obj_ops;
140                 }
141         } else {
142                 struct cmr_object *cro;
143
144                 OBD_ALLOC_PTR(cro);
145                 if (cro != NULL) {
146                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
147                         lu_object_init(lo, NULL, ld);
148                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
149                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
150                         lo->lo_ops = &cmr_obj_ops;
151                         cro->cmo_num = mds;
152                 }
153         }
154         RETURN(lo);
155 }
156
157 /**
158  * Get local child device.
159  */
160 static struct lu_device *cml_child_dev(struct cmm_device *d)
161 {
162         return &d->cmm_child->md_lu_dev;
163 }
164
165 /**
166  * Free cml_object.
167  */
168 static void cml_object_free(const struct lu_env *env,
169                             struct lu_object *lo)
170 {
171         struct cml_object *clo = lu2cml_obj(lo);
172         lu_object_fini(lo);
173         OBD_FREE_PTR(clo);
174 }
175
176 /**
177  * Initialize cml_object.
178  */
179 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
180                            const struct lu_object_conf *unused)
181 {
182         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
183         struct lu_device  *c_dev;
184         struct lu_object  *c_obj;
185         int rc;
186
187         ENTRY;
188
189 #ifdef HAVE_SPLIT_SUPPORT
190         if (cd->cmm_tgt_count == 0)
191                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
192         else
193                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
194 #endif
195         c_dev = cml_child_dev(cd);
196         if (c_dev == NULL) {
197                 rc = -ENOENT;
198         } else {
199                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
200                                                         lo->lo_header, c_dev);
201                 if (c_obj != NULL) {
202                         lu_object_add(lo, c_obj);
203                         rc = 0;
204                 } else {
205                         rc = -ENOMEM;
206                 }
207         }
208
209         RETURN(rc);
210 }
211
212 static int cml_object_print(const struct lu_env *env, void *cookie,
213                             lu_printer_t p, const struct lu_object *lo)
214 {
215         return (*p)(env, cookie, "[local]");
216 }
217
218 static const struct lu_object_operations cml_obj_ops = {
219         .loo_object_init    = cml_object_init,
220         .loo_object_free    = cml_object_free,
221         .loo_object_print   = cml_object_print
222 };
223
224 /**
225  * \name CMM local md_object operations.
226  * All of them call just corresponding operations on next layer.
227  * @{
228  */
229 static int cml_object_create(const struct lu_env *env,
230                              struct md_object *mo,
231                              const struct md_op_spec *spec,
232                              struct md_attr *attr)
233 {
234         int rc;
235         ENTRY;
236         rc = mo_object_create(env, md_object_next(mo), spec, attr);
237         RETURN(rc);
238 }
239
240 static int cml_permission(const struct lu_env *env,
241                           struct md_object *p, struct md_object *c,
242                           struct md_attr *attr, int mask)
243 {
244         int rc;
245         ENTRY;
246         rc = mo_permission(env, md_object_next(p), md_object_next(c),
247                            attr, mask);
248         RETURN(rc);
249 }
250
251 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
252                         struct md_attr *attr)
253 {
254         int rc;
255         ENTRY;
256         rc = mo_attr_get(env, md_object_next(mo), attr);
257         RETURN(rc);
258 }
259
260 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
261                         const struct md_attr *attr)
262 {
263         int rc;
264         ENTRY;
265         rc = mo_attr_set(env, md_object_next(mo), attr);
266         RETURN(rc);
267 }
268
269 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
270                          struct lu_buf *buf, const char *name)
271 {
272         int rc;
273         ENTRY;
274         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
275         RETURN(rc);
276 }
277
278 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
279                         struct lu_buf *buf)
280 {
281         int rc;
282         ENTRY;
283         rc = mo_readlink(env, md_object_next(mo), buf);
284         RETURN(rc);
285 }
286
287 static int cml_changelog(const struct lu_env *env, enum changelog_rec_type type,
288                          int flags, struct md_object *mo)
289 {
290         int rc;
291         ENTRY;
292         rc = mo_changelog(env, type, flags, md_object_next(mo));
293         RETURN(rc);
294 }
295
296 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
297                           struct lu_buf *buf)
298 {
299         int rc;
300         ENTRY;
301         rc = mo_xattr_list(env, md_object_next(mo), buf);
302         RETURN(rc);
303 }
304
305 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
306                          const struct lu_buf *buf, const char *name,
307                          int fl)
308 {
309         int rc;
310         ENTRY;
311         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
312         RETURN(rc);
313 }
314
315 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
316                          const char *name)
317 {
318         int rc;
319         ENTRY;
320         rc = mo_xattr_del(env, md_object_next(mo), name);
321         RETURN(rc);
322 }
323
324 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
325                        const struct md_attr *ma)
326 {
327         int rc;
328         ENTRY;
329         rc = mo_ref_add(env, md_object_next(mo), ma);
330         RETURN(rc);
331 }
332
333 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
334                        struct md_attr *ma)
335 {
336         int rc;
337         ENTRY;
338         rc = mo_ref_del(env, md_object_next(mo), ma);
339         RETURN(rc);
340 }
341
342 static int cml_open(const struct lu_env *env, struct md_object *mo,
343                     int flags)
344 {
345         int rc;
346         ENTRY;
347         rc = mo_open(env, md_object_next(mo), flags);
348         RETURN(rc);
349 }
350
351 static int cml_close(const struct lu_env *env, struct md_object *mo,
352                      struct md_attr *ma)
353 {
354         int rc;
355         ENTRY;
356         rc = mo_close(env, md_object_next(mo), ma);
357         RETURN(rc);
358 }
359
360 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
361                         const struct lu_rdpg *rdpg)
362 {
363         int rc;
364         ENTRY;
365         rc = mo_readpage(env, md_object_next(mo), rdpg);
366         RETURN(rc);
367 }
368
369 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
370                         struct lustre_capa *capa, int renewal)
371 {
372         int rc;
373         ENTRY;
374         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
375         RETURN(rc);
376 }
377
378 static int cml_path(const struct lu_env *env, struct md_object *mo,
379                     char *path, int pathlen, __u64 *recno, int *linkno)
380 {
381         int rc;
382         ENTRY;
383         rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
384         RETURN(rc);
385 }
386
387 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
388 {
389         int rc;
390         ENTRY;
391         rc = mo_object_sync(env, md_object_next(mo));
392         RETURN(rc);
393 }
394
395 static dt_obj_version_t cml_version_get(const struct lu_env *env,
396                                         struct md_object *mo)
397 {
398         return mo_version_get(env, md_object_next(mo));
399 }
400
401 static void cml_version_set(const struct lu_env *env, struct md_object *mo,
402                             dt_obj_version_t version)
403 {
404         return mo_version_set(env, md_object_next(mo), version);
405 }
406
407 static const struct md_object_operations cml_mo_ops = {
408         .moo_permission    = cml_permission,
409         .moo_attr_get      = cml_attr_get,
410         .moo_attr_set      = cml_attr_set,
411         .moo_xattr_get     = cml_xattr_get,
412         .moo_xattr_list    = cml_xattr_list,
413         .moo_xattr_set     = cml_xattr_set,
414         .moo_xattr_del     = cml_xattr_del,
415         .moo_object_create = cml_object_create,
416         .moo_ref_add       = cml_ref_add,
417         .moo_ref_del       = cml_ref_del,
418         .moo_open          = cml_open,
419         .moo_close         = cml_close,
420         .moo_readpage      = cml_readpage,
421         .moo_readlink      = cml_readlink,
422         .moo_changelog     = cml_changelog,
423         .moo_capa_get      = cml_capa_get,
424         .moo_object_sync   = cml_object_sync,
425         .moo_version_get   = cml_version_get,
426         .moo_version_set   = cml_version_set,
427         .moo_path          = cml_path,
428 };
429 /** @} */
430
431 /**
432  * \name CMM local md_dir_operations.
433  * @{
434  */
435 /**
436  * cml lookup object fid by name.
437  * This returns only FID by name.
438  */
439 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
440                       const struct lu_name *lname, struct lu_fid *lf,
441                       struct md_op_spec *spec)
442 {
443         int rc;
444         ENTRY;
445
446 #ifdef HAVE_SPLIT_SUPPORT
447         if (spec != NULL && spec->sp_ck_split) {
448                 rc = cmm_split_check(env, mo_p, lname->ln_name);
449                 if (rc)
450                         RETURN(rc);
451         }
452 #endif
453         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
454         RETURN(rc);
455
456 }
457
458 /**
459  * Helper to return lock mode. Used in split cases only.
460  */
461 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
462                                 struct md_object *mo, mdl_mode_t lm)
463 {
464         int rc = MDL_MINMODE;
465         ENTRY;
466
467 #ifdef HAVE_SPLIT_SUPPORT
468         rc = cmm_split_access(env, mo, lm);
469 #endif
470
471         RETURN(rc);
472 }
473
474 /**
475  * Create operation for cml.
476  * Objects are local, but split can happen.
477  * If split is not needed this will call next layer mdo_create().
478  *
479  * \param mo_p Parent directory. Local object.
480  * \param lname name of file to create.
481  * \param mo_c Child object. It has no real inode yet.
482  * \param spec creation specification.
483  * \param ma child object attributes.
484  */
485 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
486                       const struct lu_name *lname, struct md_object *mo_c,
487                       struct md_op_spec *spec, struct md_attr *ma)
488 {
489         int rc;
490         ENTRY;
491
492 #ifdef HAVE_SPLIT_SUPPORT
493         /* Lock mode always should be sane. */
494         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
495
496         /*
497          * Sigh... This is long story. MDT may have race with detecting if split
498          * is possible in cmm. We know this race and let it live, because
499          * getting it rid (with some sem or spinlock) will also mean that
500          * PDIROPS for create will not work because we kill parallel work, what
501          * is really bad for performance and makes no sense having PDIROPS. So,
502          * we better allow the race to live, but split dir only if some of
503          * concurrent threads takes EX lock, not matter which one. So that, say,
504          * two concurrent threads may have different lock modes on directory (CW
505          * and EX) and not first one which comes here and see that split is
506          * possible should split the dir, but only that one which has EX
507          * lock. And we do not care that in this case, split may happen a bit
508          * later (when dir size will not be necessarily 64K, but may be a bit
509          * larger). So that, we allow concurrent creates and protect split by EX
510          * lock.
511          */
512         if (spec->sp_cr_mode == MDL_EX) {
513                 /**
514                  * Split cases:
515                  * - Try to split \a mo_p upon each create operation.
516                  *   If split is ok, -ERESTART is returned and current thread
517                  *   will not peoceed with create. Instead it sends -ERESTART
518                  *   to client to let it know that correct MDT must be chosen.
519                  * \see cmm_split_dir()
520                  */
521                 rc = cmm_split_dir(env, mo_p);
522                 if (rc)
523                         /*
524                          * -ERESTART or some split error is returned, we can't
525                          * proceed with create.
526                          */
527                         GOTO(out, rc);
528         }
529
530         if (spec != NULL && spec->sp_ck_split) {
531                 /**
532                  * - Directory is split already. Let the caller know that
533                  * it should tell client that directory is split and operation
534                  * should repeat to correct MDT.
535                  * \see cmm_split_check()
536                  */
537                 rc = cmm_split_check(env, mo_p, lname->ln_name);
538                 if (rc)
539                         GOTO(out, rc);
540         }
541 #endif
542
543         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
544                         spec, ma);
545
546         EXIT;
547 #ifdef HAVE_SPLIT_SUPPORT
548 out:
549 #endif
550         return rc;
551 }
552
553 /** Call mdo_create_data() on next layer. All objects are local. */
554 static int cml_create_data(const struct lu_env *env, struct md_object *p,
555                            struct md_object *o,
556                            const struct md_op_spec *spec,
557                            struct md_attr *ma)
558 {
559         int rc;
560         ENTRY;
561         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
562                              spec, ma);
563         RETURN(rc);
564 }
565
566 /** Call mdo_link() on next layer. All objects are local. */
567 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
568                     struct md_object *mo_s, const struct lu_name *lname,
569                     struct md_attr *ma)
570 {
571         int rc;
572         ENTRY;
573         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
574                       lname, ma);
575         RETURN(rc);
576 }
577
578 /** Call mdo_unlink() on next layer. All objects are local. */
579 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
580                       struct md_object *mo_c, const struct lu_name *lname,
581                       struct md_attr *ma)
582 {
583         int rc;
584         ENTRY;
585         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
586                         lname, ma);
587         RETURN(rc);
588 }
589
590 /**
591  * \ingroup cmm
592  * Get mode of object.
593  * Used in both cml and cmr hence can produce RPC to another server.
594  */
595 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
596                         const struct lu_fid *lf, struct md_attr *ma,
597                         int *remote)
598 {
599         struct md_object *mo_s = md_object_find_slice(env, md, lf);
600         struct cmm_thread_info *cmi;
601         struct md_attr *tmp_ma;
602         int rc;
603         ENTRY;
604
605         if (IS_ERR(mo_s))
606                 RETURN(PTR_ERR(mo_s));
607
608         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
609                 *remote = 1;
610
611         cmi = cmm_env_info(env);
612         tmp_ma = &cmi->cmi_ma;
613         tmp_ma->ma_need = MA_INODE;
614         tmp_ma->ma_valid = 0;
615         /* get type from src, can be remote req */
616         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
617         if (rc == 0) {
618                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
619                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
620                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
621                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
622                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
623         }
624         lu_object_put(env, &mo_s->mo_lu);
625         RETURN(rc);
626 }
627
628 /**
629  * \ingroup cmm
630  * Set ctime for object.
631  * Used in both cml and cmr hence can produce RPC to another server.
632  */
633 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
634                             const struct lu_fid *lf, struct md_attr *ma)
635 {
636         struct md_object *mo_s = md_object_find_slice(env, md, lf);
637         int rc;
638         ENTRY;
639
640         if (IS_ERR(mo_s))
641                 RETURN(PTR_ERR(mo_s));
642
643         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
644         /* set ctime to obj, can be remote req */
645         rc = mo_attr_set(env, md_object_next(mo_s), ma);
646         lu_object_put(env, &mo_s->mo_lu);
647         RETURN(rc);
648 }
649
650 /** Helper to output debug information about rename operation. */
651 static inline void cml_rename_warn(const char *fname,
652                                   struct md_object *mo_po,
653                                   struct md_object *mo_pn,
654                                   const struct lu_fid *lf,
655                                   const char *s_name,
656                                   struct md_object *mo_t,
657                                   const char *t_name,
658                                   int err)
659 {
660         if (mo_t)
661                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
662                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
663                       "[tname %s] [err %d]\n", fname,
664                       PFID(lu_object_fid(&mo_po->mo_lu)),
665                       PFID(lu_object_fid(&mo_pn->mo_lu)),
666                       PFID(lf), s_name,
667                       PFID(lu_object_fid(&mo_t->mo_lu)),
668                       t_name, err);
669         else
670                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
671                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
672                       "[tname %s] [err %d]\n", fname,
673                       PFID(lu_object_fid(&mo_po->mo_lu)),
674                       PFID(lu_object_fid(&mo_pn->mo_lu)),
675                       PFID(lf), s_name,
676                       t_name, err);
677 }
678
679 /**
680  * Rename operation for cml.
681  *
682  * This is the most complex cross-reference operation. It may consist of up to 4
683  * MDS server and require several RPCs to be sent.
684  *
685  * \param mo_po Old parent object.
686  * \param mo_pn New parent object.
687  * \param lf FID of object to rename.
688  * \param ls_name Source file name.
689  * \param mo_t target object. Should be NULL here.
690  * \param lt_name Name of target file.
691  * \param ma object attributes.
692  */
693 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
694                       struct md_object *mo_pn, const struct lu_fid *lf,
695                       const struct lu_name *ls_name, struct md_object *mo_t,
696                       const struct lu_name *lt_name, struct md_attr *ma)
697 {
698         struct cmm_thread_info *cmi;
699         struct md_attr *tmp_ma = NULL;
700         struct md_object *tmp_t = mo_t;
701         int remote = 0, rc;
702         ENTRY;
703
704         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
705         if (rc)
706                 RETURN(rc);
707
708         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
709                 /**
710                  * \note \a mo_t is remote object and there is RPC to unlink it.
711                  * Before that, do local sanity check for rename first.
712                  */
713                 if (!remote) {
714                         struct md_object *mo_s = md_object_find_slice(env,
715                                                         md_obj2dev(mo_po), lf);
716                         if (IS_ERR(mo_s))
717                                 RETURN(PTR_ERR(mo_s));
718
719                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
720                         rc = mo_permission(env, md_object_next(mo_po),
721                                            md_object_next(mo_s),
722                                            ma, MAY_RENAME_SRC);
723                         lu_object_put(env, &mo_s->mo_lu);
724                         if (rc)
725                                 RETURN(rc);
726                 } else {
727                         rc = mo_permission(env, NULL, md_object_next(mo_po),
728                                            ma, MAY_UNLINK | MAY_VTX_FULL);
729                         if (rc)
730                                 RETURN(rc);
731                 }
732
733                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
734                                    MAY_UNLINK | MAY_VTX_PART);
735                 if (rc)
736                         RETURN(rc);
737
738                 /*
739                  * /note \a ma will be changed after mo_ref_del(), but we will use
740                  * it for mdo_rename() later, so save it before mo_ref_del().
741                  */
742                 cmi = cmm_env_info(env);
743                 tmp_ma = &cmi->cmi_ma;
744                 *tmp_ma = *ma;
745                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
746                 if (rc)
747                         RETURN(rc);
748
749                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
750                 mo_t = NULL;
751         }
752
753         /**
754          * \note for src on remote MDS case, change its ctime before local
755          * rename. Firstly, do local sanity check for rename if necessary.
756          */
757         if (remote) {
758                 if (!tmp_ma) {
759                         rc = mo_permission(env, NULL, md_object_next(mo_po),
760                                            ma, MAY_UNLINK | MAY_VTX_FULL);
761                         if (rc)
762                                 RETURN(rc);
763
764                         if (mo_t) {
765                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
766                                 rc = mo_permission(env, md_object_next(mo_pn),
767                                                    md_object_next(mo_t),
768                                                    ma, MAY_RENAME_TAR);
769                                 if (rc)
770                                         RETURN(rc);
771                         } else {
772                                 int mask;
773
774                                 if (mo_po != mo_pn)
775                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
776                                                 MAY_LINK : MAY_CREATE);
777                                 else
778                                         mask = MAY_CREATE;
779                                 rc = mo_permission(env, NULL,
780                                                    md_object_next(mo_pn),
781                                                    NULL, mask);
782                                 if (rc)
783                                         RETURN(rc);
784                         }
785
786                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
787                 } else {
788                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
789                 }
790
791                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
792                                       tmp_ma ? tmp_ma : ma);
793                 if (rc) {
794                         /* TODO: revoke mo_t if necessary. */
795                         cml_rename_warn("cmm_rename_ctime", mo_po,
796                                         mo_pn, lf, ls_name->ln_name,
797                                         tmp_t, lt_name->ln_name, rc);
798                         RETURN(rc);
799                 }
800         }
801
802         /* local rename, mo_t can be NULL */
803         rc = mdo_rename(env, md_object_next(mo_po),
804                         md_object_next(mo_pn), lf, ls_name,
805                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
806         if (rc)
807                 /* TODO: revoke all cml_rename */
808                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
809                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
810
811         RETURN(rc);
812 }
813
814 /**
815  * Rename target partial operation.
816  * Used for cross-ref rename.
817  */
818 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
819                           struct md_object *mo_t, const struct lu_fid *lf,
820                           const struct lu_name *lname, struct md_attr *ma)
821 {
822         int rc;
823         ENTRY;
824
825         rc = mdo_rename_tgt(env, md_object_next(mo_p),
826                             md_object_next(mo_t), lf, lname, ma);
827         RETURN(rc);
828 }
829
830 /**
831  * Name insert only operation.
832  * used only in case of rename_tgt() when target doesn't exist.
833  */
834 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
835                            const struct lu_name *lname, const struct lu_fid *lf,
836                            const struct md_attr *ma)
837 {
838         int rc;
839         ENTRY;
840
841         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
842
843         RETURN(rc);
844 }
845
846 /**
847  * \ingroup cmm
848  * Check two fids are not subdirectories.
849  */
850 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
851                          const struct lu_fid *fid, struct lu_fid *sfid)
852 {
853         struct cmm_thread_info *cmi;
854         int rc;
855         ENTRY;
856
857         cmi = cmm_env_info(env);
858         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
859         if (rc)
860                 RETURN(rc);
861
862         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
863                 RETURN(0);
864
865         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
866         RETURN(rc);
867 }
868
869 static const struct md_dir_operations cml_dir_ops = {
870         .mdo_is_subdir   = cmm_is_subdir,
871         .mdo_lookup      = cml_lookup,
872         .mdo_lock_mode   = cml_lock_mode,
873         .mdo_create      = cml_create,
874         .mdo_link        = cml_link,
875         .mdo_unlink      = cml_unlink,
876         .mdo_name_insert = cml_name_insert,
877         .mdo_rename      = cml_rename,
878         .mdo_rename_tgt  = cml_rename_tgt,
879         .mdo_create_data = cml_create_data,
880 };
881 /** @} */
882 /** @} */
883
884 /**
885  * \addtogroup cmr
886  * @{
887  */
888 /**
889  * \name cmr helpers
890  * @{
891  */
892 /** Get cmr_object from lu_object. */
893 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
894 {
895         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
896 }
897 /** Get cmr_object from md_object. */
898 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
899 {
900         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
901 }
902 /** Get cmr_object from cmm_object. */
903 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
904 {
905         return container_of0(co, struct cmr_object, cmm_obj);
906 }
907 /** @} */
908
909 /**
910  * Get proper child device from MDCs.
911  */
912 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
913 {
914         struct lu_device *next = NULL;
915         struct mdc_device *mdc;
916
917         cfs_spin_lock(&d->cmm_tgt_guard);
918         cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
919                 if (mdc->mc_num == num) {
920                         next = mdc2lu_dev(mdc);
921                         break;
922                 }
923         }
924         cfs_spin_unlock(&d->cmm_tgt_guard);
925         return next;
926 }
927
928 /**
929  * Free cmr_object.
930  */
931 static void cmr_object_free(const struct lu_env *env,
932                             struct lu_object *lo)
933 {
934         struct cmr_object *cro = lu2cmr_obj(lo);
935         lu_object_fini(lo);
936         OBD_FREE_PTR(cro);
937 }
938
939 /**
940  * Initialize cmr object.
941  */
942 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
943                            const struct lu_object_conf *unused)
944 {
945         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
946         struct lu_device  *c_dev;
947         struct lu_object  *c_obj;
948         int rc;
949
950         ENTRY;
951
952         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
953         if (c_dev == NULL) {
954                 rc = -ENOENT;
955         } else {
956                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
957                                                         lo->lo_header, c_dev);
958                 if (c_obj != NULL) {
959                         lu_object_add(lo, c_obj);
960                         rc = 0;
961                 } else {
962                         rc = -ENOMEM;
963                 }
964         }
965
966         RETURN(rc);
967 }
968
969 /**
970  * Output lu_object data.
971  */
972 static int cmr_object_print(const struct lu_env *env, void *cookie,
973                             lu_printer_t p, const struct lu_object *lo)
974 {
975         const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
976         return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
977 }
978
979 /**
980  * Cmr instance of lu_object_operations.
981  */
982 static const struct lu_object_operations cmr_obj_ops = {
983         .loo_object_init    = cmr_object_init,
984         .loo_object_free    = cmr_object_free,
985         .loo_object_print   = cmr_object_print
986 };
987
988 /**
989  * \name cmr remote md_object operations.
990  * All operations here are invalid and return errors. There is no local object
991  * so these operations return two kinds of error:
992  * -# -EFAULT if operation is prohibited.
993  * -# -EREMOTE if operation can be done just to notify upper level about remote
994  *  object.
995  *
996  * @{
997  */
998 static int cmr_object_create(const struct lu_env *env,
999                              struct md_object *mo,
1000                              const struct md_op_spec *spec,
1001                              struct md_attr *ma)
1002 {
1003         return -EFAULT;
1004 }
1005
1006 static int cmr_permission(const struct lu_env *env,
1007                           struct md_object *p, struct md_object *c,
1008                           struct md_attr *attr, int mask)
1009 {
1010         return -EREMOTE;
1011 }
1012
1013 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
1014                         struct md_attr *attr)
1015 {
1016         return -EREMOTE;
1017 }
1018
1019 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
1020                         const struct md_attr *attr)
1021 {
1022         return -EFAULT;
1023 }
1024
1025 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
1026                          struct lu_buf *buf, const char *name)
1027 {
1028         return -EFAULT;
1029 }
1030
1031 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
1032                         struct lu_buf *buf)
1033 {
1034         return -EFAULT;
1035 }
1036
1037 static int cmr_changelog(const struct lu_env *env, enum changelog_rec_type type,
1038                          int flags, struct md_object *mo)
1039 {
1040         return -EFAULT;
1041 }
1042
1043 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
1044                           struct lu_buf *buf)
1045 {
1046         return -EFAULT;
1047 }
1048
1049 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
1050                          const struct lu_buf *buf, const char *name,
1051                          int fl)
1052 {
1053         return -EFAULT;
1054 }
1055
1056 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
1057                          const char *name)
1058 {
1059         return -EFAULT;
1060 }
1061
1062 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
1063                        const struct md_attr *ma)
1064 {
1065         return -EFAULT;
1066 }
1067
1068 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
1069                        struct md_attr *ma)
1070 {
1071         return -EFAULT;
1072 }
1073
1074 static int cmr_open(const struct lu_env *env, struct md_object *mo,
1075                     int flags)
1076 {
1077         return -EREMOTE;
1078 }
1079
1080 static int cmr_close(const struct lu_env *env, struct md_object *mo,
1081                      struct md_attr *ma)
1082 {
1083         return -EFAULT;
1084 }
1085
1086 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
1087                         const struct lu_rdpg *rdpg)
1088 {
1089         return -EREMOTE;
1090 }
1091
1092 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
1093                         struct lustre_capa *capa, int renewal)
1094 {
1095         return -EFAULT;
1096 }
1097
1098 static int cmr_path(const struct lu_env *env, struct md_object *obj,
1099                     char *path, int pathlen, __u64 *recno, int *linkno)
1100 {
1101         return -EREMOTE;
1102 }
1103
1104 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
1105 {
1106         return -EFAULT;
1107 }
1108
1109 /**
1110  * cmr moo_version_get().
1111  */
1112 static dt_obj_version_t cmr_version_get(const struct lu_env *env,
1113                                         struct md_object *mo)
1114 {
1115         /** Don't check remote object version */
1116         return 0;
1117 }
1118
1119
1120 /**
1121  * cmr moo_version_set().
1122  * No need to update remote object version here, it is done as a part
1123  * of reintegration of partial operation on the remote server.
1124  */
1125 static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
1126                             dt_obj_version_t version)
1127 {
1128         return;
1129 }
1130
1131 /** Set of md_object_operations for cmr. */
1132 static const struct md_object_operations cmr_mo_ops = {
1133         .moo_permission    = cmr_permission,
1134         .moo_attr_get      = cmr_attr_get,
1135         .moo_attr_set      = cmr_attr_set,
1136         .moo_xattr_get     = cmr_xattr_get,
1137         .moo_xattr_set     = cmr_xattr_set,
1138         .moo_xattr_list    = cmr_xattr_list,
1139         .moo_xattr_del     = cmr_xattr_del,
1140         .moo_object_create = cmr_object_create,
1141         .moo_ref_add       = cmr_ref_add,
1142         .moo_ref_del       = cmr_ref_del,
1143         .moo_open          = cmr_open,
1144         .moo_close         = cmr_close,
1145         .moo_readpage      = cmr_readpage,
1146         .moo_readlink      = cmr_readlink,
1147         .moo_changelog     = cmr_changelog,
1148         .moo_capa_get      = cmr_capa_get,
1149         .moo_object_sync   = cmr_object_sync,
1150         .moo_version_get   = cmr_version_get,
1151         .moo_version_set   = cmr_version_set,
1152         .moo_path          = cmr_path,
1153 };
1154 /** @} */
1155
1156 /**
1157  * \name cmr md_dir operations.
1158  *
1159  * All methods below are cross-ref by nature. They consist of remote call and
1160  * local operation. Due to future rollback functionality there are several
1161  * limitations for such methods:
1162  * -# remote call should be done at first to do epoch negotiation between all
1163  * MDS involved and to avoid the RPC inside transaction.
1164  * -# only one RPC can be sent - also due to epoch negotiation.
1165  * For more details see rollback HLD/DLD.
1166  * @{
1167  */
1168 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1169                       const struct lu_name *lname, struct lu_fid *lf,
1170                       struct md_op_spec *spec)
1171 {
1172         /*
1173          * This can happens while rename() If new parent is remote dir, lookup
1174          * will happen here.
1175          */
1176
1177         return -EREMOTE;
1178 }
1179
1180 /** Return lock mode. */
1181 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1182                                 struct md_object *mo, mdl_mode_t lm)
1183 {
1184         return MDL_MINMODE;
1185 }
1186
1187 /**
1188  * Create operation for cmr.
1189  * Remote object creation and local name insert.
1190  *
1191  * \param mo_p Parent directory. Local object.
1192  * \param lchild_name name of file to create.
1193  * \param mo_c Child object. It has no real inode yet.
1194  * \param spec creation specification.
1195  * \param ma child object attributes.
1196  */
1197 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1198                       const struct lu_name *lchild_name, struct md_object *mo_c,
1199                       struct md_op_spec *spec,
1200                       struct md_attr *ma)
1201 {
1202         struct cmm_thread_info *cmi;
1203         struct md_attr *tmp_ma;
1204         int rc;
1205         ENTRY;
1206
1207         /* Make sure that name isn't exist before doing remote call. */
1208         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1209                         &cmm_env_info(env)->cmi_fid, NULL);
1210         if (rc == 0)
1211                 RETURN(-EEXIST);
1212         else if (rc != -ENOENT)
1213                 RETURN(rc);
1214
1215         /* check the SGID attr */
1216         cmi = cmm_env_info(env);
1217         LASSERT(cmi);
1218         tmp_ma = &cmi->cmi_ma;
1219         tmp_ma->ma_valid = 0;
1220         tmp_ma->ma_need = MA_INODE;
1221
1222 #ifdef CONFIG_FS_POSIX_ACL
1223         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1224                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1225                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1226                 tmp_ma->ma_need |= MA_ACL_DEF;
1227         }
1228 #endif
1229         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1230         if (rc)
1231                 RETURN(rc);
1232
1233         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1234                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1235                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1236                         ma->ma_attr.la_mode |= S_ISGID;
1237                         ma->ma_attr.la_valid |= LA_MODE;
1238                 }
1239         }
1240
1241 #ifdef CONFIG_FS_POSIX_ACL
1242         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1243                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1244                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1245                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1246                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1247         }
1248 #endif
1249
1250         /* Local permission check for name_insert before remote ops. */
1251         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1252                            (S_ISDIR(ma->ma_attr.la_mode) ?
1253                            MAY_LINK : MAY_CREATE));
1254         if (rc)
1255                 RETURN(rc);
1256
1257         /**
1258          * \note \a ma will be changed after mo_object_create(), but we will use
1259          * it for mdo_name_insert() later, so save it before mo_object_create().
1260          */
1261         *tmp_ma = *ma;
1262         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1263         if (rc == 0) {
1264                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1265                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1266                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1267                 if (unlikely(rc)) {
1268                         /* TODO: remove object mo_c on remote MDS */
1269                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1270                               " [name %s] [mo_c "DFID"] [err %d]\n",
1271                               PFID(lu_object_fid(&mo_p->mo_lu)),
1272                               lchild_name->ln_name,
1273                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1274                 }
1275         }
1276
1277         RETURN(rc);
1278 }
1279
1280 /**
1281  * Link operations for cmr.
1282  *
1283  * The link RPC is always issued to the server where source parent is living.
1284  * The first operation to do is object nlink increment on remote server.
1285  * Second one is local mdo_name_insert().
1286  *
1287  * \param mo_p parent directory. It is local.
1288  * \param mo_s source object to link. It is remote.
1289  * \param lname Name of link file.
1290  * \param ma object attributes.
1291  */
1292 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1293                     struct md_object *mo_s, const struct lu_name *lname,
1294                     struct md_attr *ma)
1295 {
1296         int rc;
1297         ENTRY;
1298
1299         /* Make sure that name isn't exist before doing remote call. */
1300         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1301                         &cmm_env_info(env)->cmi_fid, NULL);
1302         if (rc == 0) {
1303                 rc = -EEXIST;
1304         } else if (rc == -ENOENT) {
1305                 /* Local permission check for name_insert before remote ops. */
1306                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1307                                    MAY_CREATE);
1308                 if (rc)
1309                         RETURN(rc);
1310
1311                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1312                 if (rc == 0) {
1313                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1314                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1315                                              lu_object_fid(&mo_s->mo_lu), ma);
1316                         if (unlikely(rc)) {
1317                                 /* TODO: ref_del from mo_s on remote MDS */
1318                                 CWARN("cmr_link failed, should revoke: "
1319                                       "[mo_p "DFID"] [mo_s "DFID"] "
1320                                       "[name %s] [err %d]\n",
1321                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1322                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1323                                       lname->ln_name, rc);
1324                         }
1325                 }
1326         }
1327         RETURN(rc);
1328 }
1329
1330 /**
1331  * Unlink operations for cmr.
1332  *
1333  * The unlink RPC is always issued to the server where parent is living. Hence
1334  * the first operation to do is object unlink on remote server. Second one is
1335  * local mdo_name_remove().
1336  *
1337  * \param mo_p parent md_object. It is local.
1338  * \param mo_c child object to be unlinked. It is remote.
1339  * \param lname Name of file to unlink.
1340  * \param ma object attributes.
1341  */
1342 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1343                       struct md_object *mo_c, const struct lu_name *lname,
1344                       struct md_attr *ma)
1345 {
1346         struct cmm_thread_info *cmi;
1347         struct md_attr *tmp_ma;
1348         int rc;
1349         ENTRY;
1350
1351         /* Local permission check for name_remove before remote ops. */
1352         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1353                            MAY_UNLINK | MAY_VTX_PART);
1354         if (rc)
1355                 RETURN(rc);
1356
1357         /*
1358          * \note \a ma will be changed after mo_ref_del, but we will use
1359          * it for mdo_name_remove() later, so save it before mo_ref_del().
1360          */
1361         cmi = cmm_env_info(env);
1362         tmp_ma = &cmi->cmi_ma;
1363         *tmp_ma = *ma;
1364         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1365         if (rc == 0) {
1366                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1367                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1368                 if (unlikely(rc)) {
1369                         /* TODO: ref_add to mo_c on remote MDS */
1370                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1371                               " [mo_c "DFID"] [name %s] [err %d]\n",
1372                               PFID(lu_object_fid(&mo_p->mo_lu)),
1373                               PFID(lu_object_fid(&mo_c->mo_lu)),
1374                               lname->ln_name, rc);
1375                 }
1376         }
1377
1378         RETURN(rc);
1379 }
1380
1381 /** Helper which outputs error message during cmr_rename() */
1382 static inline void cmr_rename_warn(const char *fname,
1383                                   struct md_object *mo_po,
1384                                   struct md_object *mo_pn,
1385                                   const struct lu_fid *lf,
1386                                   const char *s_name,
1387                                   const char *t_name,
1388                                   int err)
1389 {
1390         CWARN("cmr_rename failed for %s, should revoke: "
1391               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1392               "[sname %s] [tname %s] [err %d]\n", fname,
1393               PFID(lu_object_fid(&mo_po->mo_lu)),
1394               PFID(lu_object_fid(&mo_pn->mo_lu)),
1395               PFID(lf), s_name, t_name, err);
1396 }
1397
1398 /**
1399  * Rename operation for cmr.
1400  *
1401  * This is the most complex cross-reference operation. It may consist of up to 4
1402  * MDS server and require several RPCs to be sent.
1403  *
1404  * \param mo_po Old parent object.
1405  * \param mo_pn New parent object.
1406  * \param lf FID of object to rename.
1407  * \param ls_name Source file name.
1408  * \param mo_t target object. Should be NULL here.
1409  * \param lt_name Name of target file.
1410  * \param ma object attributes.
1411  */
1412 static int cmr_rename(const struct lu_env *env,
1413                       struct md_object *mo_po, struct md_object *mo_pn,
1414                       const struct lu_fid *lf, const struct lu_name *ls_name,
1415                       struct md_object *mo_t, const struct lu_name *lt_name,
1416                       struct md_attr *ma)
1417 {
1418         struct cmm_thread_info *cmi;
1419         struct md_attr *tmp_ma;
1420         int rc;
1421         ENTRY;
1422
1423         LASSERT(mo_t == NULL);
1424
1425         /* get real type of src */
1426         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1427         if (rc)
1428                 RETURN(rc);
1429
1430         /* Local permission check for name_remove before remote ops. */
1431         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1432                            MAY_UNLINK | MAY_VTX_FULL);
1433         if (rc)
1434                 RETURN(rc);
1435
1436         /**
1437          * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
1438          * for mdo_name_remove() later, so save it before mdo_rename_tgt.
1439          */
1440         cmi = cmm_env_info(env);
1441         tmp_ma = &cmi->cmi_ma;
1442         *tmp_ma = *ma;
1443         /**
1444          * \note The \a mo_pn is remote directory, so we cannot even know if there is
1445          * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
1446          * lookup and process this further.
1447          */
1448         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1449                             NULL/* mo_t */, lf, lt_name, ma);
1450         if (rc)
1451                 RETURN(rc);
1452
1453         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1454
1455         /* src object maybe on remote MDS, do remote ops first. */
1456         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1457         if (unlikely(rc)) {
1458                 /* TODO: revoke mdo_rename_tgt */
1459                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1460                                 ls_name->ln_name, lt_name->ln_name, rc);
1461                 RETURN(rc);
1462         }
1463
1464         /* only old name is removed localy */
1465         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1466         if (unlikely(rc))
1467                 /* TODO: revoke all cmr_rename */
1468                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1469                                 ls_name->ln_name, lt_name->ln_name, rc);
1470
1471         RETURN(rc);
1472 }
1473
1474 /**
1475  * Part of cross-ref rename().
1476  * Used to insert new name in new parent and unlink target.
1477  */
1478 static int cmr_rename_tgt(const struct lu_env *env,
1479                           struct md_object *mo_p, struct md_object *mo_t,
1480                           const struct lu_fid *lf, const struct lu_name *lname,
1481                           struct md_attr *ma)
1482 {
1483         struct cmm_thread_info *cmi;
1484         struct md_attr *tmp_ma;
1485         int rc;
1486         ENTRY;
1487
1488         /* target object is remote one */
1489         /* Local permission check for rename_tgt before remote ops. */
1490         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1491                            MAY_UNLINK | MAY_VTX_PART);
1492         if (rc)
1493                 RETURN(rc);
1494
1495         /*
1496          * XXX: @ma maybe changed after mo_ref_del, but we will use
1497          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1498          */
1499         cmi = cmm_env_info(env);
1500         tmp_ma = &cmi->cmi_ma;
1501         *tmp_ma = *ma;
1502         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1503         /* continue locally with name handling only */
1504         if (rc == 0) {
1505                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1506                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1507                                     NULL, lf, lname, tmp_ma);
1508                 if (unlikely(rc)) {
1509                         /* TODO: ref_add to mo_t on remote MDS */
1510                         CWARN("cmr_rename_tgt failed, should revoke: "
1511                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1512                               "[name %s] [err %d]\n",
1513                               PFID(lu_object_fid(&mo_p->mo_lu)),
1514                               PFID(lu_object_fid(&mo_t->mo_lu)),
1515                               PFID(lf),
1516                               lname->ln_name, rc);
1517                 }
1518         }
1519         RETURN(rc);
1520 }
1521 /** @} */
1522 /**
1523  * The md_dir_operations for cmr.
1524  */
1525 static const struct md_dir_operations cmr_dir_ops = {
1526         .mdo_is_subdir   = cmm_is_subdir,
1527         .mdo_lookup      = cmr_lookup,
1528         .mdo_lock_mode   = cmr_lock_mode,
1529         .mdo_create      = cmr_create,
1530         .mdo_link        = cmr_link,
1531         .mdo_unlink      = cmr_unlink,
1532         .mdo_rename      = cmr_rename,
1533         .mdo_rename_tgt  = cmr_rename_tgt
1534 };
1535 /** @} */