Whamcloud - gitweb
LU-1770 ptlrpc: introducing OBD_CONNECT_FLOCK_OWNER flag
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <lustre_fid.h>
46 #include "cmm_internal.h"
47 #include "mdc_internal.h"
48 /**
49  * \ingroup cmm
50  * Lookup MDS number \a mds by FID \a fid.
51  *
52  * \param fid FID of object to find MDS
53  * \param mds mds number to return.
54  */
55 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
56                    mdsno_t *mds, const struct lu_env *env)
57 {
58         int rc = 0;
59         ENTRY;
60
61         LASSERT(fid_is_sane(fid));
62
63         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
64                                LU_SEQ_RANGE_MDT, env);
65         if (rc) {
66                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
67                        fid_seq(fid), rc);
68                 RETURN(rc);
69         }
70
71         if (*mds > cm->cmm_tgt_count) {
72                 CERROR("Got invalid mdsno: %x (max: %x)\n",
73                        *mds, cm->cmm_tgt_count);
74                 rc = -EINVAL;
75         } else {
76                 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
77                        LPX64"\n", *mds, fid_seq(fid));
78         }
79
80         RETURN (rc);
81 }
82
83 /**
84  * \addtogroup cml
85  * @{
86  */
87 static const struct md_object_operations cml_mo_ops;
88 static const struct md_dir_operations    cml_dir_ops;
89 static const struct lu_object_operations cml_obj_ops;
90
91 static const struct md_object_operations cmr_mo_ops;
92 static const struct md_dir_operations    cmr_dir_ops;
93 static const struct lu_object_operations cmr_obj_ops;
94
95 /**
96  * \ingroup cmm
97  * Allocate CMM object.
98  */
99 struct lu_object *cmm_object_alloc(const struct lu_env *env,
100                                    const struct lu_object_header *loh,
101                                    struct lu_device *ld)
102 {
103         const struct lu_fid *fid = &loh->loh_fid;
104         struct lu_object  *lo = NULL;
105         struct cmm_device *cd;
106         mdsno_t mds;
107         int rc = 0;
108
109         ENTRY;
110
111         cd = lu2cmm_dev(ld);
112         if (cd->cmm_flags & CMM_INITIALIZED) {
113                 /* get object location */
114                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
115                 if (rc)
116                         RETURN(NULL);
117         } else
118                 /*
119                  * Device is not yet initialized, cmm_object is being created
120                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
121                  * etc.). Such object *has* to be local.
122                  */
123                 mds = cd->cmm_local_num;
124
125         /* select the proper set of operations based on object location */
126         if (mds == cd->cmm_local_num) {
127                 struct cml_object *clo;
128
129                 OBD_ALLOC_PTR(clo);
130                 if (clo != NULL) {
131                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
132                         lu_object_init(lo, NULL, ld);
133                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
134                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
135                         lo->lo_ops = &cml_obj_ops;
136                 }
137         } else {
138                 struct cmr_object *cro;
139
140                 OBD_ALLOC_PTR(cro);
141                 if (cro != NULL) {
142                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
143                         lu_object_init(lo, NULL, ld);
144                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
145                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
146                         lo->lo_ops = &cmr_obj_ops;
147                         cro->cmo_num = mds;
148                 }
149         }
150         RETURN(lo);
151 }
152
153 /**
154  * Get local child device.
155  */
156 static struct lu_device *cml_child_dev(struct cmm_device *d)
157 {
158         return &d->cmm_child->md_lu_dev;
159 }
160
161 /**
162  * Free cml_object.
163  */
164 static void cml_object_free(const struct lu_env *env,
165                             struct lu_object *lo)
166 {
167         struct cml_object *clo = lu2cml_obj(lo);
168         lu_object_fini(lo);
169         OBD_FREE_PTR(clo);
170 }
171
172 /**
173  * Initialize cml_object.
174  */
175 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
176                            const struct lu_object_conf *unused)
177 {
178         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
179         struct lu_device  *c_dev;
180         struct lu_object  *c_obj;
181         int rc;
182
183         ENTRY;
184
185 #ifdef HAVE_SPLIT_SUPPORT
186         if (cd->cmm_tgt_count == 0)
187                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
188         else
189                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
190 #endif
191         c_dev = cml_child_dev(cd);
192         if (c_dev == NULL) {
193                 rc = -ENOENT;
194         } else {
195                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
196                                                         lo->lo_header, c_dev);
197                 if (c_obj != NULL) {
198                         lu_object_add(lo, c_obj);
199                         rc = 0;
200                 } else {
201                         rc = -ENOMEM;
202                 }
203         }
204
205         RETURN(rc);
206 }
207
208 static int cml_object_print(const struct lu_env *env, void *cookie,
209                             lu_printer_t p, const struct lu_object *lo)
210 {
211         return (*p)(env, cookie, "[local]");
212 }
213
214 static const struct lu_object_operations cml_obj_ops = {
215         .loo_object_init    = cml_object_init,
216         .loo_object_free    = cml_object_free,
217         .loo_object_print   = cml_object_print
218 };
219
220 /**
221  * \name CMM local md_object operations.
222  * All of them call just corresponding operations on next layer.
223  * @{
224  */
225 static int cml_object_create(const struct lu_env *env,
226                              struct md_object *mo,
227                              const struct md_op_spec *spec,
228                              struct md_attr *attr)
229 {
230         int rc;
231         ENTRY;
232         rc = mo_object_create(env, md_object_next(mo), spec, attr);
233         RETURN(rc);
234 }
235
236 static int cml_permission(const struct lu_env *env,
237                           struct md_object *p, struct md_object *c,
238                           struct md_attr *attr, int mask)
239 {
240         int rc;
241         ENTRY;
242         rc = mo_permission(env, md_object_next(p), md_object_next(c),
243                            attr, mask);
244         RETURN(rc);
245 }
246
247 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
248                         struct md_attr *attr)
249 {
250         int rc;
251         ENTRY;
252         rc = mo_attr_get(env, md_object_next(mo), attr);
253         RETURN(rc);
254 }
255
256 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
257                         const struct md_attr *attr)
258 {
259         int rc;
260         ENTRY;
261         rc = mo_attr_set(env, md_object_next(mo), attr);
262         RETURN(rc);
263 }
264
265 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
266                          struct lu_buf *buf, const char *name)
267 {
268         int rc;
269         ENTRY;
270         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
271         RETURN(rc);
272 }
273
274 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
275                         struct lu_buf *buf)
276 {
277         int rc;
278         ENTRY;
279         rc = mo_readlink(env, md_object_next(mo), buf);
280         RETURN(rc);
281 }
282
283 static int cml_changelog(const struct lu_env *env, enum changelog_rec_type type,
284                          int flags, struct md_object *mo)
285 {
286         int rc;
287         ENTRY;
288         rc = mo_changelog(env, type, flags, md_object_next(mo));
289         RETURN(rc);
290 }
291
292 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
293                           struct lu_buf *buf)
294 {
295         int rc;
296         ENTRY;
297         rc = mo_xattr_list(env, md_object_next(mo), buf);
298         RETURN(rc);
299 }
300
301 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
302                          const struct lu_buf *buf, const char *name,
303                          int fl)
304 {
305         int rc;
306         ENTRY;
307         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
308         RETURN(rc);
309 }
310
311 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
312                          const char *name)
313 {
314         int rc;
315         ENTRY;
316         rc = mo_xattr_del(env, md_object_next(mo), name);
317         RETURN(rc);
318 }
319
320 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
321                        const struct md_attr *ma)
322 {
323         int rc;
324         ENTRY;
325         rc = mo_ref_add(env, md_object_next(mo), ma);
326         RETURN(rc);
327 }
328
329 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
330                        struct md_attr *ma)
331 {
332         int rc;
333         ENTRY;
334         rc = mo_ref_del(env, md_object_next(mo), ma);
335         RETURN(rc);
336 }
337
338 static int cml_open(const struct lu_env *env, struct md_object *mo,
339                     int flags)
340 {
341         int rc;
342         ENTRY;
343         rc = mo_open(env, md_object_next(mo), flags);
344         RETURN(rc);
345 }
346
347 static int cml_close(const struct lu_env *env, struct md_object *mo,
348                      struct md_attr *ma, int mode)
349 {
350         int rc;
351         ENTRY;
352         rc = mo_close(env, md_object_next(mo), ma, mode);
353         RETURN(rc);
354 }
355
356 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
357                         const struct lu_rdpg *rdpg)
358 {
359         int rc;
360         ENTRY;
361         rc = mo_readpage(env, md_object_next(mo), rdpg);
362         RETURN(rc);
363 }
364
365 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
366                         struct lustre_capa *capa, int renewal)
367 {
368         int rc;
369         ENTRY;
370         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
371         RETURN(rc);
372 }
373
374 static int cml_path(const struct lu_env *env, struct md_object *mo,
375                     char *path, int pathlen, __u64 *recno, int *linkno)
376 {
377         int rc;
378         ENTRY;
379         rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
380         RETURN(rc);
381 }
382
383 static int cml_file_lock(const struct lu_env *env, struct md_object *mo,
384                          struct lov_mds_md *lmm, struct ldlm_extent *extent,
385                          struct lustre_handle *lockh)
386 {
387         int rc;
388         ENTRY;
389         rc = mo_file_lock(env, md_object_next(mo), lmm, extent, lockh);
390         RETURN(rc);
391 }
392
393 static int cml_file_unlock(const struct lu_env *env, struct md_object *mo,
394                            struct lov_mds_md *lmm, struct lustre_handle *lockh)
395 {
396         int rc;
397         ENTRY;
398         rc = mo_file_unlock(env, md_object_next(mo), lmm, lockh);
399         RETURN(rc);
400 }
401
402 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
403 {
404         int rc;
405         ENTRY;
406         rc = mo_object_sync(env, md_object_next(mo));
407         RETURN(rc);
408 }
409
410 static const struct md_object_operations cml_mo_ops = {
411         .moo_permission    = cml_permission,
412         .moo_attr_get      = cml_attr_get,
413         .moo_attr_set      = cml_attr_set,
414         .moo_xattr_get     = cml_xattr_get,
415         .moo_xattr_list    = cml_xattr_list,
416         .moo_xattr_set     = cml_xattr_set,
417         .moo_xattr_del     = cml_xattr_del,
418         .moo_object_create = cml_object_create,
419         .moo_ref_add       = cml_ref_add,
420         .moo_ref_del       = cml_ref_del,
421         .moo_open          = cml_open,
422         .moo_close         = cml_close,
423         .moo_readpage      = cml_readpage,
424         .moo_readlink      = cml_readlink,
425         .moo_changelog     = cml_changelog,
426         .moo_capa_get      = cml_capa_get,
427         .moo_object_sync   = cml_object_sync,
428         .moo_path          = cml_path,
429         .moo_file_lock     = cml_file_lock,
430         .moo_file_unlock   = cml_file_unlock,
431 };
432 /** @} */
433
434 /**
435  * \name CMM local md_dir_operations.
436  * @{
437  */
438 /**
439  * cml lookup object fid by name.
440  * This returns only FID by name.
441  */
442 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
443                       const struct lu_name *lname, struct lu_fid *lf,
444                       struct md_op_spec *spec)
445 {
446         int rc;
447         ENTRY;
448
449 #ifdef HAVE_SPLIT_SUPPORT
450         if (spec != NULL && spec->sp_ck_split) {
451                 rc = cmm_split_check(env, mo_p, lname->ln_name);
452                 if (rc)
453                         RETURN(rc);
454         }
455 #endif
456         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
457         RETURN(rc);
458
459 }
460
461 /**
462  * Helper to return lock mode. Used in split cases only.
463  */
464 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
465                                 struct md_object *mo, mdl_mode_t lm)
466 {
467         int rc = MDL_MINMODE;
468         ENTRY;
469
470 #ifdef HAVE_SPLIT_SUPPORT
471         rc = cmm_split_access(env, mo, lm);
472 #endif
473
474         RETURN(rc);
475 }
476
477 /**
478  * Create operation for cml.
479  * Objects are local, but split can happen.
480  * If split is not needed this will call next layer mdo_create().
481  *
482  * \param mo_p Parent directory. Local object.
483  * \param lname name of file to create.
484  * \param mo_c Child object. It has no real inode yet.
485  * \param spec creation specification.
486  * \param ma child object attributes.
487  */
488 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
489                       const struct lu_name *lname, struct md_object *mo_c,
490                       struct md_op_spec *spec, struct md_attr *ma)
491 {
492         int rc;
493         ENTRY;
494
495 #ifdef HAVE_SPLIT_SUPPORT
496         /* Lock mode always should be sane. */
497         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
498
499         /*
500          * Sigh... This is long story. MDT may have race with detecting if split
501          * is possible in cmm. We know this race and let it live, because
502          * getting it rid (with some sem or spinlock) will also mean that
503          * PDIROPS for create will not work because we kill parallel work, what
504          * is really bad for performance and makes no sense having PDIROPS. So,
505          * we better allow the race to live, but split dir only if some of
506          * concurrent threads takes EX lock, not matter which one. So that, say,
507          * two concurrent threads may have different lock modes on directory (CW
508          * and EX) and not first one which comes here and see that split is
509          * possible should split the dir, but only that one which has EX
510          * lock. And we do not care that in this case, split may happen a bit
511          * later (when dir size will not be necessarily 64K, but may be a bit
512          * larger). So that, we allow concurrent creates and protect split by EX
513          * lock.
514          */
515         if (spec->sp_cr_mode == MDL_EX) {
516                 /**
517                  * Split cases:
518                  * - Try to split \a mo_p upon each create operation.
519                  *   If split is ok, -ERESTART is returned and current thread
520                  *   will not peoceed with create. Instead it sends -ERESTART
521                  *   to client to let it know that correct MDT must be chosen.
522                  * \see cmm_split_dir()
523                  */
524                 rc = cmm_split_dir(env, mo_p);
525                 if (rc)
526                         /*
527                          * -ERESTART or some split error is returned, we can't
528                          * proceed with create.
529                          */
530                         GOTO(out, rc);
531         }
532
533         if (spec != NULL && spec->sp_ck_split) {
534                 /**
535                  * - Directory is split already. Let the caller know that
536                  * it should tell client that directory is split and operation
537                  * should repeat to correct MDT.
538                  * \see cmm_split_check()
539                  */
540                 rc = cmm_split_check(env, mo_p, lname->ln_name);
541                 if (rc)
542                         GOTO(out, rc);
543         }
544 #endif
545
546         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
547                         spec, ma);
548
549         EXIT;
550 #ifdef HAVE_SPLIT_SUPPORT
551 out:
552 #endif
553         return rc;
554 }
555
556 /** Call mdo_create_data() on next layer. All objects are local. */
557 static int cml_create_data(const struct lu_env *env, struct md_object *p,
558                            struct md_object *o,
559                            const struct md_op_spec *spec,
560                            struct md_attr *ma)
561 {
562         int rc;
563         ENTRY;
564         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
565                              spec, ma);
566         RETURN(rc);
567 }
568
569 /** Call mdo_link() on next layer. All objects are local. */
570 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
571                     struct md_object *mo_s, const struct lu_name *lname,
572                     struct md_attr *ma)
573 {
574         int rc;
575         ENTRY;
576         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
577                       lname, ma);
578         RETURN(rc);
579 }
580
581 /** Call mdo_unlink() on next layer. All objects are local. */
582 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
583                       struct md_object *mo_c, const struct lu_name *lname,
584                       struct md_attr *ma)
585 {
586         int rc;
587         ENTRY;
588         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
589                         lname, ma);
590         RETURN(rc);
591 }
592
593 /** Call mdo_lum_lmm_cmp() on next layer */
594 static int cml_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
595                            const struct md_op_spec *spec, struct md_attr *ma)
596 {
597         int rc;
598         ENTRY;
599
600         rc = mdo_lum_lmm_cmp(env, md_object_next(mo_c), spec, ma);
601         RETURN(rc);
602 }
603
604 /**
605  * \ingroup cmm
606  * Get mode of object.
607  * Used in both cml and cmr hence can produce RPC to another server.
608  */
609 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
610                         const struct lu_fid *lf, struct md_attr *ma,
611                         int *remote)
612 {
613         struct md_object *mo_s = md_object_find_slice(env, md, lf);
614         struct cmm_thread_info *cmi;
615         struct md_attr *tmp_ma;
616         int rc;
617         ENTRY;
618
619         if (IS_ERR(mo_s))
620                 RETURN(PTR_ERR(mo_s));
621
622         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
623                 *remote = 1;
624
625         cmi = cmm_env_info(env);
626         tmp_ma = &cmi->cmi_ma;
627         tmp_ma->ma_need = MA_INODE;
628         tmp_ma->ma_valid = 0;
629         /* get type from src, can be remote req */
630         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
631         if (rc == 0) {
632                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
633                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
634                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
635                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
636                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
637         }
638         lu_object_put(env, &mo_s->mo_lu);
639         RETURN(rc);
640 }
641
642 /**
643  * \ingroup cmm
644  * Set ctime for object.
645  * Used in both cml and cmr hence can produce RPC to another server.
646  */
647 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
648                             const struct lu_fid *lf, struct md_attr *ma)
649 {
650         struct md_object *mo_s = md_object_find_slice(env, md, lf);
651         int rc;
652         ENTRY;
653
654         if (IS_ERR(mo_s))
655                 RETURN(PTR_ERR(mo_s));
656
657         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
658         /* set ctime to obj, can be remote req */
659         rc = mo_attr_set(env, md_object_next(mo_s), ma);
660         lu_object_put(env, &mo_s->mo_lu);
661         RETURN(rc);
662 }
663
664 /** Helper to output debug information about rename operation. */
665 static inline void cml_rename_warn(const char *fname,
666                                   struct md_object *mo_po,
667                                   struct md_object *mo_pn,
668                                   const struct lu_fid *lf,
669                                   const char *s_name,
670                                   struct md_object *mo_t,
671                                   const char *t_name,
672                                   int err)
673 {
674         if (mo_t)
675                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
676                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
677                       "[tname %s] [err %d]\n", fname,
678                       PFID(lu_object_fid(&mo_po->mo_lu)),
679                       PFID(lu_object_fid(&mo_pn->mo_lu)),
680                       PFID(lf), s_name,
681                       PFID(lu_object_fid(&mo_t->mo_lu)),
682                       t_name, err);
683         else
684                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
685                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
686                       "[tname %s] [err %d]\n", fname,
687                       PFID(lu_object_fid(&mo_po->mo_lu)),
688                       PFID(lu_object_fid(&mo_pn->mo_lu)),
689                       PFID(lf), s_name,
690                       t_name, err);
691 }
692
693 /**
694  * Rename operation for cml.
695  *
696  * This is the most complex cross-reference operation. It may consist of up to 4
697  * MDS server and require several RPCs to be sent.
698  *
699  * \param mo_po Old parent object.
700  * \param mo_pn New parent object.
701  * \param lf FID of object to rename.
702  * \param ls_name Source file name.
703  * \param mo_t target object. Should be NULL here.
704  * \param lt_name Name of target file.
705  * \param ma object attributes.
706  */
707 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
708                       struct md_object *mo_pn, const struct lu_fid *lf,
709                       const struct lu_name *ls_name, struct md_object *mo_t,
710                       const struct lu_name *lt_name, struct md_attr *ma)
711 {
712         struct cmm_thread_info *cmi;
713         struct md_attr *tmp_ma = NULL;
714         struct md_object *tmp_t = mo_t;
715         int remote = 0, rc;
716         ENTRY;
717
718         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
719         if (rc)
720                 RETURN(rc);
721
722         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
723                 /**
724                  * \note \a mo_t is remote object and there is RPC to unlink it.
725                  * Before that, do local sanity check for rename first.
726                  */
727                 if (!remote) {
728                         struct md_object *mo_s = md_object_find_slice(env,
729                                                         md_obj2dev(mo_po), lf);
730                         if (IS_ERR(mo_s))
731                                 RETURN(PTR_ERR(mo_s));
732
733                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
734                         rc = mo_permission(env, md_object_next(mo_po),
735                                            md_object_next(mo_s),
736                                            ma, MAY_RENAME_SRC);
737                         lu_object_put(env, &mo_s->mo_lu);
738                         if (rc)
739                                 RETURN(rc);
740                 } else {
741                         rc = mo_permission(env, NULL, md_object_next(mo_po),
742                                            ma, MAY_UNLINK | MAY_VTX_FULL);
743                         if (rc)
744                                 RETURN(rc);
745                 }
746
747                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
748                                    MAY_UNLINK | MAY_VTX_PART);
749                 if (rc)
750                         RETURN(rc);
751
752                 /*
753                  * /note \a ma will be changed after mo_ref_del(), but we will use
754                  * it for mdo_rename() later, so save it before mo_ref_del().
755                  */
756                 cmi = cmm_env_info(env);
757                 tmp_ma = &cmi->cmi_ma;
758                 *tmp_ma = *ma;
759                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
760                 if (rc)
761                         RETURN(rc);
762
763                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
764                 mo_t = NULL;
765         }
766
767         /**
768          * \note for src on remote MDS case, change its ctime before local
769          * rename. Firstly, do local sanity check for rename if necessary.
770          */
771         if (remote) {
772                 if (!tmp_ma) {
773                         rc = mo_permission(env, NULL, md_object_next(mo_po),
774                                            ma, MAY_UNLINK | MAY_VTX_FULL);
775                         if (rc)
776                                 RETURN(rc);
777
778                         if (mo_t) {
779                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
780                                 rc = mo_permission(env, md_object_next(mo_pn),
781                                                    md_object_next(mo_t),
782                                                    ma, MAY_RENAME_TAR);
783                                 if (rc)
784                                         RETURN(rc);
785                         } else {
786                                 int mask;
787
788                                 if (mo_po != mo_pn)
789                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
790                                                 MAY_LINK : MAY_CREATE);
791                                 else
792                                         mask = MAY_CREATE;
793                                 rc = mo_permission(env, NULL,
794                                                    md_object_next(mo_pn),
795                                                    NULL, mask);
796                                 if (rc)
797                                         RETURN(rc);
798                         }
799
800                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
801                 } else {
802                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
803                 }
804
805                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
806                                       tmp_ma ? tmp_ma : ma);
807                 if (rc) {
808                         /* TODO: revoke mo_t if necessary. */
809                         cml_rename_warn("cmm_rename_ctime", mo_po,
810                                         mo_pn, lf, ls_name->ln_name,
811                                         tmp_t, lt_name->ln_name, rc);
812                         RETURN(rc);
813                 }
814         }
815
816         /* local rename, mo_t can be NULL */
817         rc = mdo_rename(env, md_object_next(mo_po),
818                         md_object_next(mo_pn), lf, ls_name,
819                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
820         if (rc)
821                 /* TODO: revoke all cml_rename */
822                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
823                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
824
825         RETURN(rc);
826 }
827
828 /**
829  * Rename target partial operation.
830  * Used for cross-ref rename.
831  */
832 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
833                           struct md_object *mo_t, const struct lu_fid *lf,
834                           const struct lu_name *lname, struct md_attr *ma)
835 {
836         int rc;
837         ENTRY;
838
839         rc = mdo_rename_tgt(env, md_object_next(mo_p),
840                             md_object_next(mo_t), lf, lname, ma);
841         RETURN(rc);
842 }
843
844 /**
845  * Name insert only operation.
846  * used only in case of rename_tgt() when target doesn't exist.
847  */
848 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
849                            const struct lu_name *lname, const struct lu_fid *lf,
850                            const struct md_attr *ma)
851 {
852         int rc;
853         ENTRY;
854
855         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
856
857         RETURN(rc);
858 }
859
860 /**
861  * \ingroup cmm
862  * Check two fids are not subdirectories.
863  */
864 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
865                          const struct lu_fid *fid, struct lu_fid *sfid)
866 {
867         struct cmm_thread_info *cmi;
868         int rc;
869         ENTRY;
870
871         cmi = cmm_env_info(env);
872         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
873         if (rc)
874                 RETURN(rc);
875
876         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
877                 RETURN(0);
878
879         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
880         RETURN(rc);
881 }
882
883 static const struct md_dir_operations cml_dir_ops = {
884         .mdo_is_subdir   = cmm_is_subdir,
885         .mdo_lookup      = cml_lookup,
886         .mdo_lock_mode   = cml_lock_mode,
887         .mdo_create      = cml_create,
888         .mdo_link        = cml_link,
889         .mdo_unlink      = cml_unlink,
890         .mdo_lum_lmm_cmp = cml_lum_lmm_cmp,
891         .mdo_name_insert = cml_name_insert,
892         .mdo_rename      = cml_rename,
893         .mdo_rename_tgt  = cml_rename_tgt,
894         .mdo_create_data = cml_create_data,
895 };
896 /** @} */
897 /** @} */
898
899 /**
900  * \addtogroup cmr
901  * @{
902  */
903 /**
904  * \name cmr helpers
905  * @{
906  */
907 /** Get cmr_object from lu_object. */
908 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
909 {
910         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
911 }
912 /** Get cmr_object from md_object. */
913 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
914 {
915         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
916 }
917 /** Get cmr_object from cmm_object. */
918 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
919 {
920         return container_of0(co, struct cmr_object, cmm_obj);
921 }
922 /** @} */
923
924 /**
925  * Get proper child device from MDCs.
926  */
927 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
928 {
929         struct lu_device *next = NULL;
930         struct mdc_device *mdc;
931
932         cfs_spin_lock(&d->cmm_tgt_guard);
933         cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
934                 if (mdc->mc_num == num) {
935                         next = mdc2lu_dev(mdc);
936                         break;
937                 }
938         }
939         cfs_spin_unlock(&d->cmm_tgt_guard);
940         return next;
941 }
942
943 /**
944  * Free cmr_object.
945  */
946 static void cmr_object_free(const struct lu_env *env,
947                             struct lu_object *lo)
948 {
949         struct cmr_object *cro = lu2cmr_obj(lo);
950         lu_object_fini(lo);
951         OBD_FREE_PTR(cro);
952 }
953
954 /**
955  * Initialize cmr object.
956  */
957 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
958                            const struct lu_object_conf *unused)
959 {
960         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
961         struct lu_device  *c_dev;
962         struct lu_object  *c_obj;
963         int rc;
964
965         ENTRY;
966
967         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
968         if (c_dev == NULL) {
969                 rc = -ENOENT;
970         } else {
971                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
972                                                         lo->lo_header, c_dev);
973                 if (c_obj != NULL) {
974                         lu_object_add(lo, c_obj);
975                         rc = 0;
976                 } else {
977                         rc = -ENOMEM;
978                 }
979         }
980
981         RETURN(rc);
982 }
983
984 /**
985  * Output lu_object data.
986  */
987 static int cmr_object_print(const struct lu_env *env, void *cookie,
988                             lu_printer_t p, const struct lu_object *lo)
989 {
990         const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
991         return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
992 }
993
994 /**
995  * Cmr instance of lu_object_operations.
996  */
997 static const struct lu_object_operations cmr_obj_ops = {
998         .loo_object_init    = cmr_object_init,
999         .loo_object_free    = cmr_object_free,
1000         .loo_object_print   = cmr_object_print
1001 };
1002
1003 /**
1004  * \name cmr remote md_object operations.
1005  * All operations here are invalid and return errors. There is no local object
1006  * so these operations return two kinds of error:
1007  * -# -EFAULT if operation is prohibited.
1008  * -# -EREMOTE if operation can be done just to notify upper level about remote
1009  *  object.
1010  *
1011  * @{
1012  */
1013 static int cmr_object_create(const struct lu_env *env,
1014                              struct md_object *mo,
1015                              const struct md_op_spec *spec,
1016                              struct md_attr *ma)
1017 {
1018         return -EFAULT;
1019 }
1020
1021 static int cmr_permission(const struct lu_env *env,
1022                           struct md_object *p, struct md_object *c,
1023                           struct md_attr *attr, int mask)
1024 {
1025         return -EREMOTE;
1026 }
1027
1028 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
1029                         struct md_attr *attr)
1030 {
1031         return -EREMOTE;
1032 }
1033
1034 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
1035                         const struct md_attr *attr)
1036 {
1037         return -EFAULT;
1038 }
1039
1040 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
1041                          struct lu_buf *buf, const char *name)
1042 {
1043         return -EFAULT;
1044 }
1045
1046 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
1047                         struct lu_buf *buf)
1048 {
1049         return -EFAULT;
1050 }
1051
1052 static int cmr_changelog(const struct lu_env *env, enum changelog_rec_type type,
1053                          int flags, struct md_object *mo)
1054 {
1055         return -EFAULT;
1056 }
1057
1058 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
1059                           struct lu_buf *buf)
1060 {
1061         return -EFAULT;
1062 }
1063
1064 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
1065                          const struct lu_buf *buf, const char *name,
1066                          int fl)
1067 {
1068         return -EFAULT;
1069 }
1070
1071 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
1072                          const char *name)
1073 {
1074         return -EFAULT;
1075 }
1076
1077 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
1078                        const struct md_attr *ma)
1079 {
1080         return -EFAULT;
1081 }
1082
1083 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
1084                        struct md_attr *ma)
1085 {
1086         return -EFAULT;
1087 }
1088
1089 static int cmr_open(const struct lu_env *env, struct md_object *mo,
1090                     int flags)
1091 {
1092         return -EREMOTE;
1093 }
1094
1095 static int cmr_close(const struct lu_env *env, struct md_object *mo,
1096                      struct md_attr *ma, int mode)
1097 {
1098         return -EFAULT;
1099 }
1100
1101 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
1102                         const struct lu_rdpg *rdpg)
1103 {
1104         return -EREMOTE;
1105 }
1106
1107 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
1108                         struct lustre_capa *capa, int renewal)
1109 {
1110         return -EFAULT;
1111 }
1112
1113 static int cmr_path(const struct lu_env *env, struct md_object *obj,
1114                     char *path, int pathlen, __u64 *recno, int *linkno)
1115 {
1116         return -EREMOTE;
1117 }
1118
1119 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
1120 {
1121         return -EFAULT;
1122 }
1123
1124 static int cmr_file_lock(const struct lu_env *env, struct md_object *mo,
1125                          struct lov_mds_md *lmm, struct ldlm_extent *extent,
1126                          struct lustre_handle *lockh)
1127 {
1128         return -EREMOTE;
1129 }
1130
1131 static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo,
1132                            struct lov_mds_md *lmm, struct lustre_handle *lockh)
1133 {
1134         return -EREMOTE;
1135 }
1136
1137 static int cmr_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
1138                            const struct md_op_spec *spec, struct md_attr *ma)
1139 {
1140         return -EREMOTE;
1141 }
1142
1143 /** Set of md_object_operations for cmr. */
1144 static const struct md_object_operations cmr_mo_ops = {
1145         .moo_permission    = cmr_permission,
1146         .moo_attr_get      = cmr_attr_get,
1147         .moo_attr_set      = cmr_attr_set,
1148         .moo_xattr_get     = cmr_xattr_get,
1149         .moo_xattr_set     = cmr_xattr_set,
1150         .moo_xattr_list    = cmr_xattr_list,
1151         .moo_xattr_del     = cmr_xattr_del,
1152         .moo_object_create = cmr_object_create,
1153         .moo_ref_add       = cmr_ref_add,
1154         .moo_ref_del       = cmr_ref_del,
1155         .moo_open          = cmr_open,
1156         .moo_close         = cmr_close,
1157         .moo_readpage      = cmr_readpage,
1158         .moo_readlink      = cmr_readlink,
1159         .moo_changelog     = cmr_changelog,
1160         .moo_capa_get      = cmr_capa_get,
1161         .moo_object_sync   = cmr_object_sync,
1162         .moo_path          = cmr_path,
1163         .moo_file_lock     = cmr_file_lock,
1164         .moo_file_unlock   = cmr_file_unlock,
1165 };
1166 /** @} */
1167
1168 /**
1169  * \name cmr md_dir operations.
1170  *
1171  * All methods below are cross-ref by nature. They consist of remote call and
1172  * local operation. Due to future rollback functionality there are several
1173  * limitations for such methods:
1174  * -# remote call should be done at first to do epoch negotiation between all
1175  * MDS involved and to avoid the RPC inside transaction.
1176  * -# only one RPC can be sent - also due to epoch negotiation.
1177  * For more details see rollback HLD/DLD.
1178  * @{
1179  */
1180 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1181                       const struct lu_name *lname, struct lu_fid *lf,
1182                       struct md_op_spec *spec)
1183 {
1184         /*
1185          * This can happens while rename() If new parent is remote dir, lookup
1186          * will happen here.
1187          */
1188
1189         return -EREMOTE;
1190 }
1191
1192 /** Return lock mode. */
1193 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1194                                 struct md_object *mo, mdl_mode_t lm)
1195 {
1196         return MDL_MINMODE;
1197 }
1198
1199 /**
1200  * Create operation for cmr.
1201  * Remote object creation and local name insert.
1202  *
1203  * \param mo_p Parent directory. Local object.
1204  * \param lchild_name name of file to create.
1205  * \param mo_c Child object. It has no real inode yet.
1206  * \param spec creation specification.
1207  * \param ma child object attributes.
1208  */
1209 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1210                       const struct lu_name *lchild_name, struct md_object *mo_c,
1211                       struct md_op_spec *spec,
1212                       struct md_attr *ma)
1213 {
1214         struct cmm_thread_info *cmi;
1215         struct md_attr *tmp_ma;
1216         int rc;
1217         ENTRY;
1218
1219         /* Make sure that name isn't exist before doing remote call. */
1220         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1221                         &cmm_env_info(env)->cmi_fid, NULL);
1222         if (rc == 0)
1223                 RETURN(-EEXIST);
1224         else if (rc != -ENOENT)
1225                 RETURN(rc);
1226
1227         /* check the SGID attr */
1228         cmi = cmm_env_info(env);
1229         LASSERT(cmi);
1230         tmp_ma = &cmi->cmi_ma;
1231         tmp_ma->ma_valid = 0;
1232         tmp_ma->ma_need = MA_INODE;
1233
1234 #ifdef CONFIG_FS_POSIX_ACL
1235         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1236                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1237                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1238                 tmp_ma->ma_need |= MA_ACL_DEF;
1239         }
1240 #endif
1241         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1242         if (rc)
1243                 RETURN(rc);
1244
1245         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1246                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1247                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1248                         ma->ma_attr.la_mode |= S_ISGID;
1249                         ma->ma_attr.la_valid |= LA_MODE;
1250                 }
1251         }
1252
1253 #ifdef CONFIG_FS_POSIX_ACL
1254         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1255                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1256                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1257                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1258                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1259         }
1260 #endif
1261
1262         /* Local permission check for name_insert before remote ops. */
1263         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1264                            (S_ISDIR(ma->ma_attr.la_mode) ?
1265                            MAY_LINK : MAY_CREATE));
1266         if (rc)
1267                 RETURN(rc);
1268
1269         /**
1270          * \note \a ma will be changed after mo_object_create(), but we will use
1271          * it for mdo_name_insert() later, so save it before mo_object_create().
1272          */
1273         *tmp_ma = *ma;
1274         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1275         if (rc == 0) {
1276                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1277                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1278                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1279                 if (unlikely(rc)) {
1280                         /* TODO: remove object mo_c on remote MDS */
1281                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1282                               " [name %s] [mo_c "DFID"] [err %d]\n",
1283                               PFID(lu_object_fid(&mo_p->mo_lu)),
1284                               lchild_name->ln_name,
1285                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1286                 }
1287         }
1288
1289         RETURN(rc);
1290 }
1291
1292 /**
1293  * Link operations for cmr.
1294  *
1295  * The link RPC is always issued to the server where source parent is living.
1296  * The first operation to do is object nlink increment on remote server.
1297  * Second one is local mdo_name_insert().
1298  *
1299  * \param mo_p parent directory. It is local.
1300  * \param mo_s source object to link. It is remote.
1301  * \param lname Name of link file.
1302  * \param ma object attributes.
1303  */
1304 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1305                     struct md_object *mo_s, const struct lu_name *lname,
1306                     struct md_attr *ma)
1307 {
1308         int rc;
1309         ENTRY;
1310
1311         /* Make sure that name isn't exist before doing remote call. */
1312         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1313                         &cmm_env_info(env)->cmi_fid, NULL);
1314         if (rc == 0) {
1315                 rc = -EEXIST;
1316         } else if (rc == -ENOENT) {
1317                 /* Local permission check for name_insert before remote ops. */
1318                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1319                                    MAY_CREATE);
1320                 if (rc)
1321                         RETURN(rc);
1322
1323                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1324                 if (rc == 0) {
1325                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1326                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1327                                              lu_object_fid(&mo_s->mo_lu), ma);
1328                         if (unlikely(rc)) {
1329                                 /* TODO: ref_del from mo_s on remote MDS */
1330                                 CWARN("cmr_link failed, should revoke: "
1331                                       "[mo_p "DFID"] [mo_s "DFID"] "
1332                                       "[name %s] [err %d]\n",
1333                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1334                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1335                                       lname->ln_name, rc);
1336                         }
1337                 }
1338         }
1339         RETURN(rc);
1340 }
1341
1342 /**
1343  * Unlink operations for cmr.
1344  *
1345  * The unlink RPC is always issued to the server where parent is living. Hence
1346  * the first operation to do is object unlink on remote server. Second one is
1347  * local mdo_name_remove().
1348  *
1349  * \param mo_p parent md_object. It is local.
1350  * \param mo_c child object to be unlinked. It is remote.
1351  * \param lname Name of file to unlink.
1352  * \param ma object attributes.
1353  */
1354 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1355                       struct md_object *mo_c, const struct lu_name *lname,
1356                       struct md_attr *ma)
1357 {
1358         struct cmm_thread_info *cmi;
1359         struct md_attr *tmp_ma;
1360         int rc;
1361         ENTRY;
1362
1363         /* Local permission check for name_remove before remote ops. */
1364         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1365                            MAY_UNLINK | MAY_VTX_PART);
1366         if (rc)
1367                 RETURN(rc);
1368
1369         /*
1370          * \note \a ma will be changed after mo_ref_del, but we will use
1371          * it for mdo_name_remove() later, so save it before mo_ref_del().
1372          */
1373         cmi = cmm_env_info(env);
1374         tmp_ma = &cmi->cmi_ma;
1375         *tmp_ma = *ma;
1376         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1377         if (rc == 0) {
1378                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1379                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1380                 if (unlikely(rc)) {
1381                         /* TODO: ref_add to mo_c on remote MDS */
1382                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1383                               " [mo_c "DFID"] [name %s] [err %d]\n",
1384                               PFID(lu_object_fid(&mo_p->mo_lu)),
1385                               PFID(lu_object_fid(&mo_c->mo_lu)),
1386                               lname->ln_name, rc);
1387                 }
1388         }
1389
1390         RETURN(rc);
1391 }
1392
1393 /** Helper which outputs error message during cmr_rename() */
1394 static inline void cmr_rename_warn(const char *fname,
1395                                   struct md_object *mo_po,
1396                                   struct md_object *mo_pn,
1397                                   const struct lu_fid *lf,
1398                                   const char *s_name,
1399                                   const char *t_name,
1400                                   int err)
1401 {
1402         CWARN("cmr_rename failed for %s, should revoke: "
1403               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1404               "[sname %s] [tname %s] [err %d]\n", fname,
1405               PFID(lu_object_fid(&mo_po->mo_lu)),
1406               PFID(lu_object_fid(&mo_pn->mo_lu)),
1407               PFID(lf), s_name, t_name, err);
1408 }
1409
1410 /**
1411  * Rename operation for cmr.
1412  *
1413  * This is the most complex cross-reference operation. It may consist of up to 4
1414  * MDS server and require several RPCs to be sent.
1415  *
1416  * \param mo_po Old parent object.
1417  * \param mo_pn New parent object.
1418  * \param lf FID of object to rename.
1419  * \param ls_name Source file name.
1420  * \param mo_t target object. Should be NULL here.
1421  * \param lt_name Name of target file.
1422  * \param ma object attributes.
1423  */
1424 static int cmr_rename(const struct lu_env *env,
1425                       struct md_object *mo_po, struct md_object *mo_pn,
1426                       const struct lu_fid *lf, const struct lu_name *ls_name,
1427                       struct md_object *mo_t, const struct lu_name *lt_name,
1428                       struct md_attr *ma)
1429 {
1430         struct cmm_thread_info *cmi;
1431         struct md_attr *tmp_ma;
1432         int rc;
1433         ENTRY;
1434
1435         LASSERT(mo_t == NULL);
1436
1437         /* get real type of src */
1438         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1439         if (rc)
1440                 RETURN(rc);
1441
1442         /* Local permission check for name_remove before remote ops. */
1443         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1444                            MAY_UNLINK | MAY_VTX_FULL);
1445         if (rc)
1446                 RETURN(rc);
1447
1448         /**
1449          * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
1450          * for mdo_name_remove() later, so save it before mdo_rename_tgt.
1451          */
1452         cmi = cmm_env_info(env);
1453         tmp_ma = &cmi->cmi_ma;
1454         *tmp_ma = *ma;
1455         /**
1456          * \note The \a mo_pn is remote directory, so we cannot even know if there is
1457          * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
1458          * lookup and process this further.
1459          */
1460         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1461                             NULL/* mo_t */, lf, lt_name, ma);
1462         if (rc)
1463                 RETURN(rc);
1464
1465         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1466
1467         /* src object maybe on remote MDS, do remote ops first. */
1468         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1469         if (unlikely(rc)) {
1470                 /* TODO: revoke mdo_rename_tgt */
1471                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1472                                 ls_name->ln_name, lt_name->ln_name, rc);
1473                 RETURN(rc);
1474         }
1475
1476         /* only old name is removed localy */
1477         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1478         if (unlikely(rc))
1479                 /* TODO: revoke all cmr_rename */
1480                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1481                                 ls_name->ln_name, lt_name->ln_name, rc);
1482
1483         RETURN(rc);
1484 }
1485
1486 /**
1487  * Part of cross-ref rename().
1488  * Used to insert new name in new parent and unlink target.
1489  */
1490 static int cmr_rename_tgt(const struct lu_env *env,
1491                           struct md_object *mo_p, struct md_object *mo_t,
1492                           const struct lu_fid *lf, const struct lu_name *lname,
1493                           struct md_attr *ma)
1494 {
1495         struct cmm_thread_info *cmi;
1496         struct md_attr *tmp_ma;
1497         int rc;
1498         ENTRY;
1499
1500         /* target object is remote one */
1501         /* Local permission check for rename_tgt before remote ops. */
1502         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1503                            MAY_UNLINK | MAY_VTX_PART);
1504         if (rc)
1505                 RETURN(rc);
1506
1507         /*
1508          * XXX: @ma maybe changed after mo_ref_del, but we will use
1509          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1510          */
1511         cmi = cmm_env_info(env);
1512         tmp_ma = &cmi->cmi_ma;
1513         *tmp_ma = *ma;
1514         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1515         /* continue locally with name handling only */
1516         if (rc == 0) {
1517                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1518                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1519                                     NULL, lf, lname, tmp_ma);
1520                 if (unlikely(rc)) {
1521                         /* TODO: ref_add to mo_t on remote MDS */
1522                         CWARN("cmr_rename_tgt failed, should revoke: "
1523                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1524                               "[name %s] [err %d]\n",
1525                               PFID(lu_object_fid(&mo_p->mo_lu)),
1526                               PFID(lu_object_fid(&mo_t->mo_lu)),
1527                               PFID(lf),
1528                               lname->ln_name, rc);
1529                 }
1530         }
1531         RETURN(rc);
1532 }
1533 /** @} */
1534 /**
1535  * The md_dir_operations for cmr.
1536  */
1537 static const struct md_dir_operations cmr_dir_ops = {
1538         .mdo_is_subdir   = cmm_is_subdir,
1539         .mdo_lookup      = cmr_lookup,
1540         .mdo_lock_mode   = cmr_lock_mode,
1541         .mdo_create      = cmr_create,
1542         .mdo_link        = cmr_link,
1543         .mdo_unlink      = cmr_unlink,
1544         .mdo_lum_lmm_cmp = cmr_lum_lmm_cmp,
1545         .mdo_rename      = cmr_rename,
1546         .mdo_rename_tgt  = cmr_rename_tgt
1547 };
1548 /** @} */