Whamcloud - gitweb
feb7b89cf956d6ce9d8f3e6374362619aa25cd4d
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <lustre_fid.h>
50 #include "cmm_internal.h"
51 #include "mdc_internal.h"
52 /**
53  * \ingroup cmm
54  * Lookup MDS number \a mds by FID \a fid.
55  *
56  * \param fid FID of object to find MDS
57  * \param mds mds number to return.
58  */
59 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
60                    mdsno_t *mds, const struct lu_env *env)
61 {
62         int rc = 0;
63         ENTRY;
64
65         LASSERT(fid_is_sane(fid));
66
67         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
68         if (rc) {
69                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
70                        fid_seq(fid), rc);
71                 RETURN(rc);
72         }
73
74         if (*mds > cm->cmm_tgt_count) {
75                 CERROR("Got invalid mdsno: %x (max: %x)\n",
76                        *mds, cm->cmm_tgt_count);
77                 rc = -EINVAL;
78         } else {
79                 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
80                        LPX64"\n", *mds, fid_seq(fid));
81         }
82
83         RETURN (rc);
84 }
85
86 /**
87  * \addtogroup cml
88  * @{
89  */
90 static const struct md_object_operations cml_mo_ops;
91 static const struct md_dir_operations    cml_dir_ops;
92 static const struct lu_object_operations cml_obj_ops;
93
94 static const struct md_object_operations cmr_mo_ops;
95 static const struct md_dir_operations    cmr_dir_ops;
96 static const struct lu_object_operations cmr_obj_ops;
97
98 /**
99  * \ingroup cmm
100  * Allocate CMM object.
101  */
102 struct lu_object *cmm_object_alloc(const struct lu_env *env,
103                                    const struct lu_object_header *loh,
104                                    struct lu_device *ld)
105 {
106         const struct lu_fid *fid = &loh->loh_fid;
107         struct lu_object  *lo = NULL;
108         struct cmm_device *cd;
109         mdsno_t mds;
110         int rc = 0;
111
112         ENTRY;
113
114         cd = lu2cmm_dev(ld);
115         if (cd->cmm_flags & CMM_INITIALIZED) {
116                 /* get object location */
117                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
118                 if (rc)
119                         RETURN(NULL);
120         } else
121                 /*
122                  * Device is not yet initialized, cmm_object is being created
123                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
124                  * etc.). Such object *has* to be local.
125                  */
126                 mds = cd->cmm_local_num;
127
128         /* select the proper set of operations based on object location */
129         if (mds == cd->cmm_local_num) {
130                 struct cml_object *clo;
131
132                 OBD_ALLOC_PTR(clo);
133                 if (clo != NULL) {
134                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
135                         lu_object_init(lo, NULL, ld);
136                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
137                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
138                         lo->lo_ops = &cml_obj_ops;
139                 }
140         } else {
141                 struct cmr_object *cro;
142
143                 OBD_ALLOC_PTR(cro);
144                 if (cro != NULL) {
145                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
146                         lu_object_init(lo, NULL, ld);
147                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
148                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
149                         lo->lo_ops = &cmr_obj_ops;
150                         cro->cmo_num = mds;
151                 }
152         }
153         RETURN(lo);
154 }
155
156 /**
157  * Get local child device.
158  */
159 static struct lu_device *cml_child_dev(struct cmm_device *d)
160 {
161         return &d->cmm_child->md_lu_dev;
162 }
163
164 /**
165  * Free cml_object.
166  */
167 static void cml_object_free(const struct lu_env *env,
168                             struct lu_object *lo)
169 {
170         struct cml_object *clo = lu2cml_obj(lo);
171         lu_object_fini(lo);
172         OBD_FREE_PTR(clo);
173 }
174
175 /**
176  * Initialize cml_object.
177  */
178 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
179                            const struct lu_object_conf *unused)
180 {
181         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
182         struct lu_device  *c_dev;
183         struct lu_object  *c_obj;
184         int rc;
185
186         ENTRY;
187
188 #ifdef HAVE_SPLIT_SUPPORT
189         if (cd->cmm_tgt_count == 0)
190                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
191         else
192                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
193 #endif
194         c_dev = cml_child_dev(cd);
195         if (c_dev == NULL) {
196                 rc = -ENOENT;
197         } else {
198                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
199                                                         lo->lo_header, c_dev);
200                 if (c_obj != NULL) {
201                         lu_object_add(lo, c_obj);
202                         rc = 0;
203                 } else {
204                         rc = -ENOMEM;
205                 }
206         }
207
208         RETURN(rc);
209 }
210
211 static int cml_object_print(const struct lu_env *env, void *cookie,
212                             lu_printer_t p, const struct lu_object *lo)
213 {
214         return (*p)(env, cookie, "[local]");
215 }
216
217 static const struct lu_object_operations cml_obj_ops = {
218         .loo_object_init    = cml_object_init,
219         .loo_object_free    = cml_object_free,
220         .loo_object_print   = cml_object_print
221 };
222
223 /**
224  * \name CMM local md_object operations.
225  * All of them call just corresponding operations on next layer.
226  * @{
227  */
228 static int cml_object_create(const struct lu_env *env,
229                              struct md_object *mo,
230                              const struct md_op_spec *spec,
231                              struct md_attr *attr)
232 {
233         int rc;
234         ENTRY;
235         rc = mo_object_create(env, md_object_next(mo), spec, attr);
236         RETURN(rc);
237 }
238
239 static int cml_permission(const struct lu_env *env,
240                           struct md_object *p, struct md_object *c,
241                           struct md_attr *attr, int mask)
242 {
243         int rc;
244         ENTRY;
245         rc = mo_permission(env, md_object_next(p), md_object_next(c),
246                            attr, mask);
247         RETURN(rc);
248 }
249
250 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
251                         struct md_attr *attr)
252 {
253         int rc;
254         ENTRY;
255         rc = mo_attr_get(env, md_object_next(mo), attr);
256         RETURN(rc);
257 }
258
259 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
260                         const struct md_attr *attr)
261 {
262         int rc;
263         ENTRY;
264         rc = mo_attr_set(env, md_object_next(mo), attr);
265         RETURN(rc);
266 }
267
268 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
269                          struct lu_buf *buf, const char *name)
270 {
271         int rc;
272         ENTRY;
273         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
274         RETURN(rc);
275 }
276
277 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
278                         struct lu_buf *buf)
279 {
280         int rc;
281         ENTRY;
282         rc = mo_readlink(env, md_object_next(mo), buf);
283         RETURN(rc);
284 }
285
286 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
287                           struct lu_buf *buf)
288 {
289         int rc;
290         ENTRY;
291         rc = mo_xattr_list(env, md_object_next(mo), buf);
292         RETURN(rc);
293 }
294
295 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
296                          const struct lu_buf *buf, const char *name,
297                          int fl)
298 {
299         int rc;
300         ENTRY;
301         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
302         RETURN(rc);
303 }
304
305 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
306                          const char *name)
307 {
308         int rc;
309         ENTRY;
310         rc = mo_xattr_del(env, md_object_next(mo), name);
311         RETURN(rc);
312 }
313
314 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
315                        const struct md_attr *ma)
316 {
317         int rc;
318         ENTRY;
319         rc = mo_ref_add(env, md_object_next(mo), ma);
320         RETURN(rc);
321 }
322
323 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
324                        struct md_attr *ma)
325 {
326         int rc;
327         ENTRY;
328         rc = mo_ref_del(env, md_object_next(mo), ma);
329         RETURN(rc);
330 }
331
332 static int cml_open(const struct lu_env *env, struct md_object *mo,
333                     int flags)
334 {
335         int rc;
336         ENTRY;
337         rc = mo_open(env, md_object_next(mo), flags);
338         RETURN(rc);
339 }
340
341 static int cml_close(const struct lu_env *env, struct md_object *mo,
342                      struct md_attr *ma)
343 {
344         int rc;
345         ENTRY;
346         rc = mo_close(env, md_object_next(mo), ma);
347         RETURN(rc);
348 }
349
350 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
351                         const struct lu_rdpg *rdpg)
352 {
353         int rc;
354         ENTRY;
355         rc = mo_readpage(env, md_object_next(mo), rdpg);
356         RETURN(rc);
357 }
358
359 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
360                         struct lustre_capa *capa, int renewal)
361 {
362         int rc;
363         ENTRY;
364         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
365         RETURN(rc);
366 }
367
368 static int cml_path(const struct lu_env *env, struct md_object *mo,
369                     char *path, int pathlen, __u64 *recno, int *linkno)
370 {
371         int rc;
372         ENTRY;
373         rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
374         RETURN(rc);
375 }
376
377 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
378 {
379         int rc;
380         ENTRY;
381         rc = mo_object_sync(env, md_object_next(mo));
382         RETURN(rc);
383 }
384
385 static dt_obj_version_t cml_version_get(const struct lu_env *env,
386                                         struct md_object *mo)
387 {
388         return mo_version_get(env, md_object_next(mo));
389 }
390
391 static void cml_version_set(const struct lu_env *env, struct md_object *mo,
392                             dt_obj_version_t version)
393 {
394         return mo_version_set(env, md_object_next(mo), version);
395 }
396
397 static const struct md_object_operations cml_mo_ops = {
398         .moo_permission    = cml_permission,
399         .moo_attr_get      = cml_attr_get,
400         .moo_attr_set      = cml_attr_set,
401         .moo_xattr_get     = cml_xattr_get,
402         .moo_xattr_list    = cml_xattr_list,
403         .moo_xattr_set     = cml_xattr_set,
404         .moo_xattr_del     = cml_xattr_del,
405         .moo_object_create = cml_object_create,
406         .moo_ref_add       = cml_ref_add,
407         .moo_ref_del       = cml_ref_del,
408         .moo_open          = cml_open,
409         .moo_close         = cml_close,
410         .moo_readpage      = cml_readpage,
411         .moo_readlink      = cml_readlink,
412         .moo_capa_get      = cml_capa_get,
413         .moo_object_sync   = cml_object_sync,
414         .moo_version_get   = cml_version_get,
415         .moo_version_set   = cml_version_set,
416         .moo_path          = cml_path,
417 };
418 /** @} */
419
420 /**
421  * \name CMM local md_dir_operations.
422  * @{
423  */
424 /**
425  * cml lookup object fid by name.
426  * This returns only FID by name.
427  */
428 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
429                       const struct lu_name *lname, struct lu_fid *lf,
430                       struct md_op_spec *spec)
431 {
432         int rc;
433         ENTRY;
434
435 #ifdef HAVE_SPLIT_SUPPORT
436         if (spec != NULL && spec->sp_ck_split) {
437                 rc = cmm_split_check(env, mo_p, lname->ln_name);
438                 if (rc)
439                         RETURN(rc);
440         }
441 #endif
442         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
443         RETURN(rc);
444
445 }
446
447 /**
448  * Helper to return lock mode. Used in split cases only.
449  */
450 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
451                                 struct md_object *mo, mdl_mode_t lm)
452 {
453         int rc = MDL_MINMODE;
454         ENTRY;
455
456 #ifdef HAVE_SPLIT_SUPPORT
457         rc = cmm_split_access(env, mo, lm);
458 #endif
459
460         RETURN(rc);
461 }
462
463 /**
464  * Create operation for cml.
465  * Objects are local, but split can happen.
466  * If split is not needed this will call next layer mdo_create().
467  *
468  * \param mo_p Parent directory. Local object.
469  * \param lname name of file to create.
470  * \param mo_c Child object. It has no real inode yet.
471  * \param spec creation specification.
472  * \param ma child object attributes.
473  */
474 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
475                       const struct lu_name *lname, struct md_object *mo_c,
476                       struct md_op_spec *spec, struct md_attr *ma)
477 {
478         int rc;
479         ENTRY;
480
481 #ifdef HAVE_SPLIT_SUPPORT
482         /* Lock mode always should be sane. */
483         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
484
485         /*
486          * Sigh... This is long story. MDT may have race with detecting if split
487          * is possible in cmm. We know this race and let it live, because
488          * getting it rid (with some sem or spinlock) will also mean that
489          * PDIROPS for create will not work because we kill parallel work, what
490          * is really bad for performance and makes no sense having PDIROPS. So,
491          * we better allow the race to live, but split dir only if some of
492          * concurrent threads takes EX lock, not matter which one. So that, say,
493          * two concurrent threads may have different lock modes on directory (CW
494          * and EX) and not first one which comes here and see that split is
495          * possible should split the dir, but only that one which has EX
496          * lock. And we do not care that in this case, split may happen a bit
497          * later (when dir size will not be necessarily 64K, but may be a bit
498          * larger). So that, we allow concurrent creates and protect split by EX
499          * lock.
500          */
501         if (spec->sp_cr_mode == MDL_EX) {
502                 /**
503                  * Split cases:
504                  * - Try to split \a mo_p upon each create operation.
505                  *   If split is ok, -ERESTART is returned and current thread
506                  *   will not peoceed with create. Instead it sends -ERESTART
507                  *   to client to let it know that correct MDT must be chosen.
508                  * \see cmm_split_dir()
509                  */
510                 rc = cmm_split_dir(env, mo_p);
511                 if (rc)
512                         /*
513                          * -ERESTART or some split error is returned, we can't
514                          * proceed with create.
515                          */
516                         GOTO(out, rc);
517         }
518
519         if (spec != NULL && spec->sp_ck_split) {
520                 /**
521                  * - Directory is split already. Let the caller know that
522                  * it should tell client that directory is split and operation
523                  * should repeat to correct MDT.
524                  * \see cmm_split_check()
525                  */
526                 rc = cmm_split_check(env, mo_p, lname->ln_name);
527                 if (rc)
528                         GOTO(out, rc);
529         }
530 #endif
531
532         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
533                         spec, ma);
534
535         EXIT;
536 #ifdef HAVE_SPLIT_SUPPORT
537 out:
538 #endif
539         return rc;
540 }
541
542 /** Call mdo_create_data() on next layer. All objects are local. */
543 static int cml_create_data(const struct lu_env *env, struct md_object *p,
544                            struct md_object *o,
545                            const struct md_op_spec *spec,
546                            struct md_attr *ma)
547 {
548         int rc;
549         ENTRY;
550         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
551                              spec, ma);
552         RETURN(rc);
553 }
554
555 /** Call mdo_link() on next layer. All objects are local. */
556 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
557                     struct md_object *mo_s, const struct lu_name *lname,
558                     struct md_attr *ma)
559 {
560         int rc;
561         ENTRY;
562         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
563                       lname, ma);
564         RETURN(rc);
565 }
566
567 /** Call mdo_unlink() on next layer. All objects are local. */
568 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
569                       struct md_object *mo_c, const struct lu_name *lname,
570                       struct md_attr *ma)
571 {
572         int rc;
573         ENTRY;
574         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
575                         lname, ma);
576         RETURN(rc);
577 }
578
579 /**
580  * \ingroup cmm
581  * Get mode of object.
582  * Used in both cml and cmr hence can produce RPC to another server.
583  */
584 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
585                         const struct lu_fid *lf, struct md_attr *ma,
586                         int *remote)
587 {
588         struct md_object *mo_s = md_object_find_slice(env, md, lf);
589         struct cmm_thread_info *cmi;
590         struct md_attr *tmp_ma;
591         int rc;
592         ENTRY;
593
594         if (IS_ERR(mo_s))
595                 RETURN(PTR_ERR(mo_s));
596
597         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
598                 *remote = 1;
599
600         cmi = cmm_env_info(env);
601         tmp_ma = &cmi->cmi_ma;
602         tmp_ma->ma_need = MA_INODE;
603         tmp_ma->ma_valid = 0;
604         /* get type from src, can be remote req */
605         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
606         if (rc == 0) {
607                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
608                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
609                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
610                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
611                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
612         }
613         lu_object_put(env, &mo_s->mo_lu);
614         RETURN(rc);
615 }
616
617 /**
618  * \ingroup cmm
619  * Set ctime for object.
620  * Used in both cml and cmr hence can produce RPC to another server.
621  */
622 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
623                             const struct lu_fid *lf, struct md_attr *ma)
624 {
625         struct md_object *mo_s = md_object_find_slice(env, md, lf);
626         int rc;
627         ENTRY;
628
629         if (IS_ERR(mo_s))
630                 RETURN(PTR_ERR(mo_s));
631
632         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
633         /* set ctime to obj, can be remote req */
634         rc = mo_attr_set(env, md_object_next(mo_s), ma);
635         lu_object_put(env, &mo_s->mo_lu);
636         RETURN(rc);
637 }
638
639 /** Helper to output debug information about rename operation. */
640 static inline void cml_rename_warn(const char *fname,
641                                   struct md_object *mo_po,
642                                   struct md_object *mo_pn,
643                                   const struct lu_fid *lf,
644                                   const char *s_name,
645                                   struct md_object *mo_t,
646                                   const char *t_name,
647                                   int err)
648 {
649         if (mo_t)
650                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
651                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
652                       "[tname %s] [err %d]\n", fname,
653                       PFID(lu_object_fid(&mo_po->mo_lu)),
654                       PFID(lu_object_fid(&mo_pn->mo_lu)),
655                       PFID(lf), s_name,
656                       PFID(lu_object_fid(&mo_t->mo_lu)),
657                       t_name, err);
658         else
659                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
660                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
661                       "[tname %s] [err %d]\n", fname,
662                       PFID(lu_object_fid(&mo_po->mo_lu)),
663                       PFID(lu_object_fid(&mo_pn->mo_lu)),
664                       PFID(lf), s_name,
665                       t_name, err);
666 }
667
668 /**
669  * Rename operation for cml.
670  *
671  * This is the most complex cross-reference operation. It may consist of up to 4
672  * MDS server and require several RPCs to be sent.
673  *
674  * \param mo_po Old parent object.
675  * \param mo_pn New parent object.
676  * \param lf FID of object to rename.
677  * \param ls_name Source file name.
678  * \param mo_t target object. Should be NULL here.
679  * \param lt_name Name of target file.
680  * \param ma object attributes.
681  */
682 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
683                       struct md_object *mo_pn, const struct lu_fid *lf,
684                       const struct lu_name *ls_name, struct md_object *mo_t,
685                       const struct lu_name *lt_name, struct md_attr *ma)
686 {
687         struct cmm_thread_info *cmi;
688         struct md_attr *tmp_ma = NULL;
689         struct md_object *tmp_t = mo_t;
690         int remote = 0, rc;
691         ENTRY;
692
693         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
694         if (rc)
695                 RETURN(rc);
696
697         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
698                 /**
699                  * \note \a mo_t is remote object and there is RPC to unlink it.
700                  * Before that, do local sanity check for rename first.
701                  */
702                 if (!remote) {
703                         struct md_object *mo_s = md_object_find_slice(env,
704                                                         md_obj2dev(mo_po), lf);
705                         if (IS_ERR(mo_s))
706                                 RETURN(PTR_ERR(mo_s));
707
708                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
709                         rc = mo_permission(env, md_object_next(mo_po),
710                                            md_object_next(mo_s),
711                                            ma, MAY_RENAME_SRC);
712                         lu_object_put(env, &mo_s->mo_lu);
713                         if (rc)
714                                 RETURN(rc);
715                 } else {
716                         rc = mo_permission(env, NULL, md_object_next(mo_po),
717                                            ma, MAY_UNLINK | MAY_VTX_FULL);
718                         if (rc)
719                                 RETURN(rc);
720                 }
721
722                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
723                                    MAY_UNLINK | MAY_VTX_PART);
724                 if (rc)
725                         RETURN(rc);
726
727                 /*
728                  * /note \a ma will be changed after mo_ref_del(), but we will use
729                  * it for mdo_rename() later, so save it before mo_ref_del().
730                  */
731                 cmi = cmm_env_info(env);
732                 tmp_ma = &cmi->cmi_ma;
733                 *tmp_ma = *ma;
734                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
735                 if (rc)
736                         RETURN(rc);
737
738                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
739                 mo_t = NULL;
740         }
741
742         /**
743          * \note for src on remote MDS case, change its ctime before local
744          * rename. Firstly, do local sanity check for rename if necessary.
745          */
746         if (remote) {
747                 if (!tmp_ma) {
748                         rc = mo_permission(env, NULL, md_object_next(mo_po),
749                                            ma, MAY_UNLINK | MAY_VTX_FULL);
750                         if (rc)
751                                 RETURN(rc);
752
753                         if (mo_t) {
754                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
755                                 rc = mo_permission(env, md_object_next(mo_pn),
756                                                    md_object_next(mo_t),
757                                                    ma, MAY_RENAME_TAR);
758                                 if (rc)
759                                         RETURN(rc);
760                         } else {
761                                 int mask;
762
763                                 if (mo_po != mo_pn)
764                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
765                                                 MAY_LINK : MAY_CREATE);
766                                 else
767                                         mask = MAY_CREATE;
768                                 rc = mo_permission(env, NULL,
769                                                    md_object_next(mo_pn),
770                                                    NULL, mask);
771                                 if (rc)
772                                         RETURN(rc);
773                         }
774
775                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
776                 } else {
777                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
778                 }
779
780                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
781                                       tmp_ma ? tmp_ma : ma);
782                 if (rc) {
783                         /* TODO: revoke mo_t if necessary. */
784                         cml_rename_warn("cmm_rename_ctime", mo_po,
785                                         mo_pn, lf, ls_name->ln_name,
786                                         tmp_t, lt_name->ln_name, rc);
787                         RETURN(rc);
788                 }
789         }
790
791         /* local rename, mo_t can be NULL */
792         rc = mdo_rename(env, md_object_next(mo_po),
793                         md_object_next(mo_pn), lf, ls_name,
794                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
795         if (rc)
796                 /* TODO: revoke all cml_rename */
797                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
798                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
799
800         RETURN(rc);
801 }
802
803 /**
804  * Rename target partial operation.
805  * Used for cross-ref rename.
806  */
807 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
808                           struct md_object *mo_t, const struct lu_fid *lf,
809                           const struct lu_name *lname, struct md_attr *ma)
810 {
811         int rc;
812         ENTRY;
813
814         rc = mdo_rename_tgt(env, md_object_next(mo_p),
815                             md_object_next(mo_t), lf, lname, ma);
816         RETURN(rc);
817 }
818
819 /**
820  * Name insert only operation.
821  * used only in case of rename_tgt() when target doesn't exist.
822  */
823 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
824                            const struct lu_name *lname, const struct lu_fid *lf,
825                            const struct md_attr *ma)
826 {
827         int rc;
828         ENTRY;
829
830         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
831
832         RETURN(rc);
833 }
834
835 /**
836  * \ingroup cmm
837  * Check two fids are not subdirectories.
838  */
839 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
840                          const struct lu_fid *fid, struct lu_fid *sfid)
841 {
842         struct cmm_thread_info *cmi;
843         int rc;
844         ENTRY;
845
846         cmi = cmm_env_info(env);
847         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
848         if (rc)
849                 RETURN(rc);
850
851         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
852                 RETURN(0);
853
854         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
855         RETURN(rc);
856 }
857
858 static const struct md_dir_operations cml_dir_ops = {
859         .mdo_is_subdir   = cmm_is_subdir,
860         .mdo_lookup      = cml_lookup,
861         .mdo_lock_mode   = cml_lock_mode,
862         .mdo_create      = cml_create,
863         .mdo_link        = cml_link,
864         .mdo_unlink      = cml_unlink,
865         .mdo_name_insert = cml_name_insert,
866         .mdo_rename      = cml_rename,
867         .mdo_rename_tgt  = cml_rename_tgt,
868         .mdo_create_data = cml_create_data
869 };
870 /** @} */
871 /** @} */
872
873 /**
874  * \addtogroup cmr
875  * @{
876  */
877 /**
878  * \name cmr helpers
879  * @{
880  */
881 /** Get cmr_object from lu_object. */
882 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
883 {
884         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
885 }
886 /** Get cmr_object from md_object. */
887 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
888 {
889         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
890 }
891 /** Get cmr_object from cmm_object. */
892 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
893 {
894         return container_of0(co, struct cmr_object, cmm_obj);
895 }
896 /** @} */
897
898 /**
899  * Get proper child device from MDCs.
900  */
901 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
902 {
903         struct lu_device *next = NULL;
904         struct mdc_device *mdc;
905
906         cfs_spin_lock(&d->cmm_tgt_guard);
907         cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
908                 if (mdc->mc_num == num) {
909                         next = mdc2lu_dev(mdc);
910                         break;
911                 }
912         }
913         cfs_spin_unlock(&d->cmm_tgt_guard);
914         return next;
915 }
916
917 /**
918  * Free cmr_object.
919  */
920 static void cmr_object_free(const struct lu_env *env,
921                             struct lu_object *lo)
922 {
923         struct cmr_object *cro = lu2cmr_obj(lo);
924         lu_object_fini(lo);
925         OBD_FREE_PTR(cro);
926 }
927
928 /**
929  * Initialize cmr object.
930  */
931 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
932                            const struct lu_object_conf *unused)
933 {
934         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
935         struct lu_device  *c_dev;
936         struct lu_object  *c_obj;
937         int rc;
938
939         ENTRY;
940
941         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
942         if (c_dev == NULL) {
943                 rc = -ENOENT;
944         } else {
945                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
946                                                         lo->lo_header, c_dev);
947                 if (c_obj != NULL) {
948                         lu_object_add(lo, c_obj);
949                         rc = 0;
950                 } else {
951                         rc = -ENOMEM;
952                 }
953         }
954
955         RETURN(rc);
956 }
957
958 /**
959  * Output lu_object data.
960  */
961 static int cmr_object_print(const struct lu_env *env, void *cookie,
962                             lu_printer_t p, const struct lu_object *lo)
963 {
964         const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
965         return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
966 }
967
968 /**
969  * Cmr instance of lu_object_operations.
970  */
971 static const struct lu_object_operations cmr_obj_ops = {
972         .loo_object_init    = cmr_object_init,
973         .loo_object_free    = cmr_object_free,
974         .loo_object_print   = cmr_object_print
975 };
976
977 /**
978  * \name cmr remote md_object operations.
979  * All operations here are invalid and return errors. There is no local object
980  * so these operations return two kinds of error:
981  * -# -EFAULT if operation is prohibited.
982  * -# -EREMOTE if operation can be done just to notify upper level about remote
983  *  object.
984  *
985  * @{
986  */
987 static int cmr_object_create(const struct lu_env *env,
988                              struct md_object *mo,
989                              const struct md_op_spec *spec,
990                              struct md_attr *ma)
991 {
992         return -EFAULT;
993 }
994
995 static int cmr_permission(const struct lu_env *env,
996                           struct md_object *p, struct md_object *c,
997                           struct md_attr *attr, int mask)
998 {
999         return -EREMOTE;
1000 }
1001
1002 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
1003                         struct md_attr *attr)
1004 {
1005         return -EREMOTE;
1006 }
1007
1008 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
1009                         const struct md_attr *attr)
1010 {
1011         return -EFAULT;
1012 }
1013
1014 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
1015                          struct lu_buf *buf, const char *name)
1016 {
1017         return -EFAULT;
1018 }
1019
1020 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
1021                         struct lu_buf *buf)
1022 {
1023         return -EFAULT;
1024 }
1025
1026 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
1027                           struct lu_buf *buf)
1028 {
1029         return -EFAULT;
1030 }
1031
1032 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
1033                          const struct lu_buf *buf, const char *name,
1034                          int fl)
1035 {
1036         return -EFAULT;
1037 }
1038
1039 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
1040                          const char *name)
1041 {
1042         return -EFAULT;
1043 }
1044
1045 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
1046                        const struct md_attr *ma)
1047 {
1048         return -EFAULT;
1049 }
1050
1051 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
1052                        struct md_attr *ma)
1053 {
1054         return -EFAULT;
1055 }
1056
1057 static int cmr_open(const struct lu_env *env, struct md_object *mo,
1058                     int flags)
1059 {
1060         return -EREMOTE;
1061 }
1062
1063 static int cmr_close(const struct lu_env *env, struct md_object *mo,
1064                      struct md_attr *ma)
1065 {
1066         return -EFAULT;
1067 }
1068
1069 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
1070                         const struct lu_rdpg *rdpg)
1071 {
1072         return -EREMOTE;
1073 }
1074
1075 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
1076                         struct lustre_capa *capa, int renewal)
1077 {
1078         return -EFAULT;
1079 }
1080
1081 static int cmr_path(const struct lu_env *env, struct md_object *obj,
1082                     char *path, int pathlen, __u64 *recno, int *linkno)
1083 {
1084         return -EREMOTE;
1085 }
1086
1087 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
1088 {
1089         return -EFAULT;
1090 }
1091
1092 /**
1093  * cmr moo_version_get().
1094  */
1095 static dt_obj_version_t cmr_version_get(const struct lu_env *env,
1096                                         struct md_object *mo)
1097 {
1098         /** Don't check remote object version */
1099         return 0;
1100 }
1101
1102
1103 /**
1104  * cmr moo_version_set().
1105  * No need to update remote object version here, it is done as a part
1106  * of reintegration of partial operation on the remote server.
1107  */
1108 static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
1109                             dt_obj_version_t version)
1110 {
1111         return;
1112 }
1113
1114 /** Set of md_object_operations for cmr. */
1115 static const struct md_object_operations cmr_mo_ops = {
1116         .moo_permission    = cmr_permission,
1117         .moo_attr_get      = cmr_attr_get,
1118         .moo_attr_set      = cmr_attr_set,
1119         .moo_xattr_get     = cmr_xattr_get,
1120         .moo_xattr_set     = cmr_xattr_set,
1121         .moo_xattr_list    = cmr_xattr_list,
1122         .moo_xattr_del     = cmr_xattr_del,
1123         .moo_object_create = cmr_object_create,
1124         .moo_ref_add       = cmr_ref_add,
1125         .moo_ref_del       = cmr_ref_del,
1126         .moo_open          = cmr_open,
1127         .moo_close         = cmr_close,
1128         .moo_readpage      = cmr_readpage,
1129         .moo_readlink      = cmr_readlink,
1130         .moo_capa_get      = cmr_capa_get,
1131         .moo_object_sync   = cmr_object_sync,
1132         .moo_version_get   = cmr_version_get,
1133         .moo_version_set   = cmr_version_set,
1134         .moo_path          = cmr_path,
1135 };
1136 /** @} */
1137
1138 /**
1139  * \name cmr md_dir operations.
1140  *
1141  * All methods below are cross-ref by nature. They consist of remote call and
1142  * local operation. Due to future rollback functionality there are several
1143  * limitations for such methods:
1144  * -# remote call should be done at first to do epoch negotiation between all
1145  * MDS involved and to avoid the RPC inside transaction.
1146  * -# only one RPC can be sent - also due to epoch negotiation.
1147  * For more details see rollback HLD/DLD.
1148  * @{
1149  */
1150 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1151                       const struct lu_name *lname, struct lu_fid *lf,
1152                       struct md_op_spec *spec)
1153 {
1154         /*
1155          * This can happens while rename() If new parent is remote dir, lookup
1156          * will happen here.
1157          */
1158
1159         return -EREMOTE;
1160 }
1161
1162 /** Return lock mode. */
1163 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1164                                 struct md_object *mo, mdl_mode_t lm)
1165 {
1166         return MDL_MINMODE;
1167 }
1168
1169 /**
1170  * Create operation for cmr.
1171  * Remote object creation and local name insert.
1172  *
1173  * \param mo_p Parent directory. Local object.
1174  * \param lchild_name name of file to create.
1175  * \param mo_c Child object. It has no real inode yet.
1176  * \param spec creation specification.
1177  * \param ma child object attributes.
1178  */
1179 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1180                       const struct lu_name *lchild_name, struct md_object *mo_c,
1181                       struct md_op_spec *spec,
1182                       struct md_attr *ma)
1183 {
1184         struct cmm_thread_info *cmi;
1185         struct md_attr *tmp_ma;
1186         int rc;
1187         ENTRY;
1188
1189         /* Make sure that name isn't exist before doing remote call. */
1190         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1191                         &cmm_env_info(env)->cmi_fid, NULL);
1192         if (rc == 0)
1193                 RETURN(-EEXIST);
1194         else if (rc != -ENOENT)
1195                 RETURN(rc);
1196
1197         /* check the SGID attr */
1198         cmi = cmm_env_info(env);
1199         LASSERT(cmi);
1200         tmp_ma = &cmi->cmi_ma;
1201         tmp_ma->ma_valid = 0;
1202         tmp_ma->ma_need = MA_INODE;
1203
1204 #ifdef CONFIG_FS_POSIX_ACL
1205         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1206                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1207                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1208                 tmp_ma->ma_need |= MA_ACL_DEF;
1209         }
1210 #endif
1211         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1212         if (rc)
1213                 RETURN(rc);
1214
1215         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1216                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1217                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1218                         ma->ma_attr.la_mode |= S_ISGID;
1219                         ma->ma_attr.la_valid |= LA_MODE;
1220                 }
1221         }
1222
1223 #ifdef CONFIG_FS_POSIX_ACL
1224         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1225                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1226                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1227                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1228                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1229         }
1230 #endif
1231
1232         /* Local permission check for name_insert before remote ops. */
1233         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1234                            (S_ISDIR(ma->ma_attr.la_mode) ?
1235                            MAY_LINK : MAY_CREATE));
1236         if (rc)
1237                 RETURN(rc);
1238
1239         /**
1240          * \note \a ma will be changed after mo_object_create(), but we will use
1241          * it for mdo_name_insert() later, so save it before mo_object_create().
1242          */
1243         *tmp_ma = *ma;
1244         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1245         if (rc == 0) {
1246                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1247                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1248                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1249                 if (unlikely(rc)) {
1250                         /* TODO: remove object mo_c on remote MDS */
1251                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1252                               " [name %s] [mo_c "DFID"] [err %d]\n",
1253                               PFID(lu_object_fid(&mo_p->mo_lu)),
1254                               lchild_name->ln_name,
1255                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1256                 }
1257         }
1258
1259         RETURN(rc);
1260 }
1261
1262 /**
1263  * Link operations for cmr.
1264  *
1265  * The link RPC is always issued to the server where source parent is living.
1266  * The first operation to do is object nlink increment on remote server.
1267  * Second one is local mdo_name_insert().
1268  *
1269  * \param mo_p parent directory. It is local.
1270  * \param mo_s source object to link. It is remote.
1271  * \param lname Name of link file.
1272  * \param ma object attributes.
1273  */
1274 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1275                     struct md_object *mo_s, const struct lu_name *lname,
1276                     struct md_attr *ma)
1277 {
1278         int rc;
1279         ENTRY;
1280
1281         /* Make sure that name isn't exist before doing remote call. */
1282         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1283                         &cmm_env_info(env)->cmi_fid, NULL);
1284         if (rc == 0) {
1285                 rc = -EEXIST;
1286         } else if (rc == -ENOENT) {
1287                 /* Local permission check for name_insert before remote ops. */
1288                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1289                                    MAY_CREATE);
1290                 if (rc)
1291                         RETURN(rc);
1292
1293                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1294                 if (rc == 0) {
1295                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1296                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1297                                              lu_object_fid(&mo_s->mo_lu), ma);
1298                         if (unlikely(rc)) {
1299                                 /* TODO: ref_del from mo_s on remote MDS */
1300                                 CWARN("cmr_link failed, should revoke: "
1301                                       "[mo_p "DFID"] [mo_s "DFID"] "
1302                                       "[name %s] [err %d]\n",
1303                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1304                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1305                                       lname->ln_name, rc);
1306                         }
1307                 }
1308         }
1309         RETURN(rc);
1310 }
1311
1312 /**
1313  * Unlink operations for cmr.
1314  *
1315  * The unlink RPC is always issued to the server where parent is living. Hence
1316  * the first operation to do is object unlink on remote server. Second one is
1317  * local mdo_name_remove().
1318  *
1319  * \param mo_p parent md_object. It is local.
1320  * \param mo_c child object to be unlinked. It is remote.
1321  * \param lname Name of file to unlink.
1322  * \param ma object attributes.
1323  */
1324 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1325                       struct md_object *mo_c, const struct lu_name *lname,
1326                       struct md_attr *ma)
1327 {
1328         struct cmm_thread_info *cmi;
1329         struct md_attr *tmp_ma;
1330         int rc;
1331         ENTRY;
1332
1333         /* Local permission check for name_remove before remote ops. */
1334         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1335                            MAY_UNLINK | MAY_VTX_PART);
1336         if (rc)
1337                 RETURN(rc);
1338
1339         /*
1340          * \note \a ma will be changed after mo_ref_del, but we will use
1341          * it for mdo_name_remove() later, so save it before mo_ref_del().
1342          */
1343         cmi = cmm_env_info(env);
1344         tmp_ma = &cmi->cmi_ma;
1345         *tmp_ma = *ma;
1346         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1347         if (rc == 0) {
1348                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1349                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1350                 if (unlikely(rc)) {
1351                         /* TODO: ref_add to mo_c on remote MDS */
1352                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1353                               " [mo_c "DFID"] [name %s] [err %d]\n",
1354                               PFID(lu_object_fid(&mo_p->mo_lu)),
1355                               PFID(lu_object_fid(&mo_c->mo_lu)),
1356                               lname->ln_name, rc);
1357                 }
1358         }
1359
1360         RETURN(rc);
1361 }
1362
1363 /** Helper which outputs error message during cmr_rename() */
1364 static inline void cmr_rename_warn(const char *fname,
1365                                   struct md_object *mo_po,
1366                                   struct md_object *mo_pn,
1367                                   const struct lu_fid *lf,
1368                                   const char *s_name,
1369                                   const char *t_name,
1370                                   int err)
1371 {
1372         CWARN("cmr_rename failed for %s, should revoke: "
1373               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1374               "[sname %s] [tname %s] [err %d]\n", fname,
1375               PFID(lu_object_fid(&mo_po->mo_lu)),
1376               PFID(lu_object_fid(&mo_pn->mo_lu)),
1377               PFID(lf), s_name, t_name, err);
1378 }
1379
1380 /**
1381  * Rename operation for cmr.
1382  *
1383  * This is the most complex cross-reference operation. It may consist of up to 4
1384  * MDS server and require several RPCs to be sent.
1385  *
1386  * \param mo_po Old parent object.
1387  * \param mo_pn New parent object.
1388  * \param lf FID of object to rename.
1389  * \param ls_name Source file name.
1390  * \param mo_t target object. Should be NULL here.
1391  * \param lt_name Name of target file.
1392  * \param ma object attributes.
1393  */
1394 static int cmr_rename(const struct lu_env *env,
1395                       struct md_object *mo_po, struct md_object *mo_pn,
1396                       const struct lu_fid *lf, const struct lu_name *ls_name,
1397                       struct md_object *mo_t, const struct lu_name *lt_name,
1398                       struct md_attr *ma)
1399 {
1400         struct cmm_thread_info *cmi;
1401         struct md_attr *tmp_ma;
1402         int rc;
1403         ENTRY;
1404
1405         LASSERT(mo_t == NULL);
1406
1407         /* get real type of src */
1408         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1409         if (rc)
1410                 RETURN(rc);
1411
1412         /* Local permission check for name_remove before remote ops. */
1413         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1414                            MAY_UNLINK | MAY_VTX_FULL);
1415         if (rc)
1416                 RETURN(rc);
1417
1418         /**
1419          * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
1420          * for mdo_name_remove() later, so save it before mdo_rename_tgt.
1421          */
1422         cmi = cmm_env_info(env);
1423         tmp_ma = &cmi->cmi_ma;
1424         *tmp_ma = *ma;
1425         /**
1426          * \note The \a mo_pn is remote directory, so we cannot even know if there is
1427          * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
1428          * lookup and process this further.
1429          */
1430         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1431                             NULL/* mo_t */, lf, lt_name, ma);
1432         if (rc)
1433                 RETURN(rc);
1434
1435         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1436
1437         /* src object maybe on remote MDS, do remote ops first. */
1438         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1439         if (unlikely(rc)) {
1440                 /* TODO: revoke mdo_rename_tgt */
1441                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1442                                 ls_name->ln_name, lt_name->ln_name, rc);
1443                 RETURN(rc);
1444         }
1445
1446         /* only old name is removed localy */
1447         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1448         if (unlikely(rc))
1449                 /* TODO: revoke all cmr_rename */
1450                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1451                                 ls_name->ln_name, lt_name->ln_name, rc);
1452
1453         RETURN(rc);
1454 }
1455
1456 /**
1457  * Part of cross-ref rename().
1458  * Used to insert new name in new parent and unlink target.
1459  */
1460 static int cmr_rename_tgt(const struct lu_env *env,
1461                           struct md_object *mo_p, struct md_object *mo_t,
1462                           const struct lu_fid *lf, const struct lu_name *lname,
1463                           struct md_attr *ma)
1464 {
1465         struct cmm_thread_info *cmi;
1466         struct md_attr *tmp_ma;
1467         int rc;
1468         ENTRY;
1469
1470         /* target object is remote one */
1471         /* Local permission check for rename_tgt before remote ops. */
1472         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1473                            MAY_UNLINK | MAY_VTX_PART);
1474         if (rc)
1475                 RETURN(rc);
1476
1477         /*
1478          * XXX: @ma maybe changed after mo_ref_del, but we will use
1479          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1480          */
1481         cmi = cmm_env_info(env);
1482         tmp_ma = &cmi->cmi_ma;
1483         *tmp_ma = *ma;
1484         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1485         /* continue locally with name handling only */
1486         if (rc == 0) {
1487                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1488                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1489                                     NULL, lf, lname, tmp_ma);
1490                 if (unlikely(rc)) {
1491                         /* TODO: ref_add to mo_t on remote MDS */
1492                         CWARN("cmr_rename_tgt failed, should revoke: "
1493                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1494                               "[name %s] [err %d]\n",
1495                               PFID(lu_object_fid(&mo_p->mo_lu)),
1496                               PFID(lu_object_fid(&mo_t->mo_lu)),
1497                               PFID(lf),
1498                               lname->ln_name, rc);
1499                 }
1500         }
1501         RETURN(rc);
1502 }
1503 /** @} */
1504 /**
1505  * The md_dir_operations for cmr.
1506  */
1507 static const struct md_dir_operations cmr_dir_ops = {
1508         .mdo_is_subdir   = cmm_is_subdir,
1509         .mdo_lookup      = cmr_lookup,
1510         .mdo_lock_mode   = cmr_lock_mode,
1511         .mdo_create      = cmr_create,
1512         .mdo_link        = cmr_link,
1513         .mdo_unlink      = cmr_unlink,
1514         .mdo_rename      = cmr_rename,
1515         .mdo_rename_tgt  = cmr_rename_tgt
1516 };
1517 /** @} */