Whamcloud - gitweb
LU-1146 build: batch update copyright messages
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/cmm/cmm_object.c
39  *
40  * Lustre Cluster Metadata Manager (cmm)
41  *
42  * Author: Mike Pershin <tappro@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <lustre_fid.h>
52 #include "cmm_internal.h"
53 #include "mdc_internal.h"
54 /**
55  * \ingroup cmm
56  * Lookup MDS number \a mds by FID \a fid.
57  *
58  * \param fid FID of object to find MDS
59  * \param mds mds number to return.
60  */
61 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
62                    mdsno_t *mds, const struct lu_env *env)
63 {
64         int rc = 0;
65         ENTRY;
66
67         LASSERT(fid_is_sane(fid));
68
69         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
70                                LU_SEQ_RANGE_MDT, env);
71         if (rc) {
72                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
73                        fid_seq(fid), rc);
74                 RETURN(rc);
75         }
76
77         if (*mds > cm->cmm_tgt_count) {
78                 CERROR("Got invalid mdsno: %x (max: %x)\n",
79                        *mds, cm->cmm_tgt_count);
80                 rc = -EINVAL;
81         } else {
82                 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
83                        LPX64"\n", *mds, fid_seq(fid));
84         }
85
86         RETURN (rc);
87 }
88
89 /**
90  * \addtogroup cml
91  * @{
92  */
93 static const struct md_object_operations cml_mo_ops;
94 static const struct md_dir_operations    cml_dir_ops;
95 static const struct lu_object_operations cml_obj_ops;
96
97 static const struct md_object_operations cmr_mo_ops;
98 static const struct md_dir_operations    cmr_dir_ops;
99 static const struct lu_object_operations cmr_obj_ops;
100
101 /**
102  * \ingroup cmm
103  * Allocate CMM object.
104  */
105 struct lu_object *cmm_object_alloc(const struct lu_env *env,
106                                    const struct lu_object_header *loh,
107                                    struct lu_device *ld)
108 {
109         const struct lu_fid *fid = &loh->loh_fid;
110         struct lu_object  *lo = NULL;
111         struct cmm_device *cd;
112         mdsno_t mds;
113         int rc = 0;
114
115         ENTRY;
116
117         cd = lu2cmm_dev(ld);
118         if (cd->cmm_flags & CMM_INITIALIZED) {
119                 /* get object location */
120                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
121                 if (rc)
122                         RETURN(NULL);
123         } else
124                 /*
125                  * Device is not yet initialized, cmm_object is being created
126                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
127                  * etc.). Such object *has* to be local.
128                  */
129                 mds = cd->cmm_local_num;
130
131         /* select the proper set of operations based on object location */
132         if (mds == cd->cmm_local_num) {
133                 struct cml_object *clo;
134
135                 OBD_ALLOC_PTR(clo);
136                 if (clo != NULL) {
137                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
138                         lu_object_init(lo, NULL, ld);
139                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
140                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
141                         lo->lo_ops = &cml_obj_ops;
142                 }
143         } else {
144                 struct cmr_object *cro;
145
146                 OBD_ALLOC_PTR(cro);
147                 if (cro != NULL) {
148                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
149                         lu_object_init(lo, NULL, ld);
150                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
151                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
152                         lo->lo_ops = &cmr_obj_ops;
153                         cro->cmo_num = mds;
154                 }
155         }
156         RETURN(lo);
157 }
158
159 /**
160  * Get local child device.
161  */
162 static struct lu_device *cml_child_dev(struct cmm_device *d)
163 {
164         return &d->cmm_child->md_lu_dev;
165 }
166
167 /**
168  * Free cml_object.
169  */
170 static void cml_object_free(const struct lu_env *env,
171                             struct lu_object *lo)
172 {
173         struct cml_object *clo = lu2cml_obj(lo);
174         lu_object_fini(lo);
175         OBD_FREE_PTR(clo);
176 }
177
178 /**
179  * Initialize cml_object.
180  */
181 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
182                            const struct lu_object_conf *unused)
183 {
184         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
185         struct lu_device  *c_dev;
186         struct lu_object  *c_obj;
187         int rc;
188
189         ENTRY;
190
191 #ifdef HAVE_SPLIT_SUPPORT
192         if (cd->cmm_tgt_count == 0)
193                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
194         else
195                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
196 #endif
197         c_dev = cml_child_dev(cd);
198         if (c_dev == NULL) {
199                 rc = -ENOENT;
200         } else {
201                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
202                                                         lo->lo_header, c_dev);
203                 if (c_obj != NULL) {
204                         lu_object_add(lo, c_obj);
205                         rc = 0;
206                 } else {
207                         rc = -ENOMEM;
208                 }
209         }
210
211         RETURN(rc);
212 }
213
214 static int cml_object_print(const struct lu_env *env, void *cookie,
215                             lu_printer_t p, const struct lu_object *lo)
216 {
217         return (*p)(env, cookie, "[local]");
218 }
219
220 static const struct lu_object_operations cml_obj_ops = {
221         .loo_object_init    = cml_object_init,
222         .loo_object_free    = cml_object_free,
223         .loo_object_print   = cml_object_print
224 };
225
226 /**
227  * \name CMM local md_object operations.
228  * All of them call just corresponding operations on next layer.
229  * @{
230  */
231 static int cml_object_create(const struct lu_env *env,
232                              struct md_object *mo,
233                              const struct md_op_spec *spec,
234                              struct md_attr *attr)
235 {
236         int rc;
237         ENTRY;
238         rc = mo_object_create(env, md_object_next(mo), spec, attr);
239         RETURN(rc);
240 }
241
242 static int cml_permission(const struct lu_env *env,
243                           struct md_object *p, struct md_object *c,
244                           struct md_attr *attr, int mask)
245 {
246         int rc;
247         ENTRY;
248         rc = mo_permission(env, md_object_next(p), md_object_next(c),
249                            attr, mask);
250         RETURN(rc);
251 }
252
253 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
254                         struct md_attr *attr)
255 {
256         int rc;
257         ENTRY;
258         rc = mo_attr_get(env, md_object_next(mo), attr);
259         RETURN(rc);
260 }
261
262 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
263                         const struct md_attr *attr)
264 {
265         int rc;
266         ENTRY;
267         rc = mo_attr_set(env, md_object_next(mo), attr);
268         RETURN(rc);
269 }
270
271 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
272                          struct lu_buf *buf, const char *name)
273 {
274         int rc;
275         ENTRY;
276         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
277         RETURN(rc);
278 }
279
280 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
281                         struct lu_buf *buf)
282 {
283         int rc;
284         ENTRY;
285         rc = mo_readlink(env, md_object_next(mo), buf);
286         RETURN(rc);
287 }
288
289 static int cml_changelog(const struct lu_env *env, enum changelog_rec_type type,
290                          int flags, struct md_object *mo)
291 {
292         int rc;
293         ENTRY;
294         rc = mo_changelog(env, type, flags, md_object_next(mo));
295         RETURN(rc);
296 }
297
298 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
299                           struct lu_buf *buf)
300 {
301         int rc;
302         ENTRY;
303         rc = mo_xattr_list(env, md_object_next(mo), buf);
304         RETURN(rc);
305 }
306
307 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
308                          const struct lu_buf *buf, const char *name,
309                          int fl)
310 {
311         int rc;
312         ENTRY;
313         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
314         RETURN(rc);
315 }
316
317 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
318                          const char *name)
319 {
320         int rc;
321         ENTRY;
322         rc = mo_xattr_del(env, md_object_next(mo), name);
323         RETURN(rc);
324 }
325
326 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
327                        const struct md_attr *ma)
328 {
329         int rc;
330         ENTRY;
331         rc = mo_ref_add(env, md_object_next(mo), ma);
332         RETURN(rc);
333 }
334
335 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
336                        struct md_attr *ma)
337 {
338         int rc;
339         ENTRY;
340         rc = mo_ref_del(env, md_object_next(mo), ma);
341         RETURN(rc);
342 }
343
344 static int cml_open(const struct lu_env *env, struct md_object *mo,
345                     int flags)
346 {
347         int rc;
348         ENTRY;
349         rc = mo_open(env, md_object_next(mo), flags);
350         RETURN(rc);
351 }
352
353 static int cml_close(const struct lu_env *env, struct md_object *mo,
354                      struct md_attr *ma, int mode)
355 {
356         int rc;
357         ENTRY;
358         rc = mo_close(env, md_object_next(mo), ma, mode);
359         RETURN(rc);
360 }
361
362 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
363                         const struct lu_rdpg *rdpg)
364 {
365         int rc;
366         ENTRY;
367         rc = mo_readpage(env, md_object_next(mo), rdpg);
368         RETURN(rc);
369 }
370
371 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
372                         struct lustre_capa *capa, int renewal)
373 {
374         int rc;
375         ENTRY;
376         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
377         RETURN(rc);
378 }
379
380 static int cml_path(const struct lu_env *env, struct md_object *mo,
381                     char *path, int pathlen, __u64 *recno, int *linkno)
382 {
383         int rc;
384         ENTRY;
385         rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
386         RETURN(rc);
387 }
388
389 static int cml_file_lock(const struct lu_env *env, struct md_object *mo,
390                          struct lov_mds_md *lmm, struct ldlm_extent *extent,
391                          struct lustre_handle *lockh)
392 {
393         int rc;
394         ENTRY;
395         rc = mo_file_lock(env, md_object_next(mo), lmm, extent, lockh);
396         RETURN(rc);
397 }
398
399 static int cml_file_unlock(const struct lu_env *env, struct md_object *mo,
400                            struct lov_mds_md *lmm, struct lustre_handle *lockh)
401 {
402         int rc;
403         ENTRY;
404         rc = mo_file_unlock(env, md_object_next(mo), lmm, lockh);
405         RETURN(rc);
406 }
407
408 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
409 {
410         int rc;
411         ENTRY;
412         rc = mo_object_sync(env, md_object_next(mo));
413         RETURN(rc);
414 }
415
416 static const struct md_object_operations cml_mo_ops = {
417         .moo_permission    = cml_permission,
418         .moo_attr_get      = cml_attr_get,
419         .moo_attr_set      = cml_attr_set,
420         .moo_xattr_get     = cml_xattr_get,
421         .moo_xattr_list    = cml_xattr_list,
422         .moo_xattr_set     = cml_xattr_set,
423         .moo_xattr_del     = cml_xattr_del,
424         .moo_object_create = cml_object_create,
425         .moo_ref_add       = cml_ref_add,
426         .moo_ref_del       = cml_ref_del,
427         .moo_open          = cml_open,
428         .moo_close         = cml_close,
429         .moo_readpage      = cml_readpage,
430         .moo_readlink      = cml_readlink,
431         .moo_changelog     = cml_changelog,
432         .moo_capa_get      = cml_capa_get,
433         .moo_object_sync   = cml_object_sync,
434         .moo_path          = cml_path,
435         .moo_file_lock     = cml_file_lock,
436         .moo_file_unlock   = cml_file_unlock,
437 };
438 /** @} */
439
440 /**
441  * \name CMM local md_dir_operations.
442  * @{
443  */
444 /**
445  * cml lookup object fid by name.
446  * This returns only FID by name.
447  */
448 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
449                       const struct lu_name *lname, struct lu_fid *lf,
450                       struct md_op_spec *spec)
451 {
452         int rc;
453         ENTRY;
454
455 #ifdef HAVE_SPLIT_SUPPORT
456         if (spec != NULL && spec->sp_ck_split) {
457                 rc = cmm_split_check(env, mo_p, lname->ln_name);
458                 if (rc)
459                         RETURN(rc);
460         }
461 #endif
462         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
463         RETURN(rc);
464
465 }
466
467 /**
468  * Helper to return lock mode. Used in split cases only.
469  */
470 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
471                                 struct md_object *mo, mdl_mode_t lm)
472 {
473         int rc = MDL_MINMODE;
474         ENTRY;
475
476 #ifdef HAVE_SPLIT_SUPPORT
477         rc = cmm_split_access(env, mo, lm);
478 #endif
479
480         RETURN(rc);
481 }
482
483 /**
484  * Create operation for cml.
485  * Objects are local, but split can happen.
486  * If split is not needed this will call next layer mdo_create().
487  *
488  * \param mo_p Parent directory. Local object.
489  * \param lname name of file to create.
490  * \param mo_c Child object. It has no real inode yet.
491  * \param spec creation specification.
492  * \param ma child object attributes.
493  */
494 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
495                       const struct lu_name *lname, struct md_object *mo_c,
496                       struct md_op_spec *spec, struct md_attr *ma)
497 {
498         int rc;
499         ENTRY;
500
501 #ifdef HAVE_SPLIT_SUPPORT
502         /* Lock mode always should be sane. */
503         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
504
505         /*
506          * Sigh... This is long story. MDT may have race with detecting if split
507          * is possible in cmm. We know this race and let it live, because
508          * getting it rid (with some sem or spinlock) will also mean that
509          * PDIROPS for create will not work because we kill parallel work, what
510          * is really bad for performance and makes no sense having PDIROPS. So,
511          * we better allow the race to live, but split dir only if some of
512          * concurrent threads takes EX lock, not matter which one. So that, say,
513          * two concurrent threads may have different lock modes on directory (CW
514          * and EX) and not first one which comes here and see that split is
515          * possible should split the dir, but only that one which has EX
516          * lock. And we do not care that in this case, split may happen a bit
517          * later (when dir size will not be necessarily 64K, but may be a bit
518          * larger). So that, we allow concurrent creates and protect split by EX
519          * lock.
520          */
521         if (spec->sp_cr_mode == MDL_EX) {
522                 /**
523                  * Split cases:
524                  * - Try to split \a mo_p upon each create operation.
525                  *   If split is ok, -ERESTART is returned and current thread
526                  *   will not peoceed with create. Instead it sends -ERESTART
527                  *   to client to let it know that correct MDT must be chosen.
528                  * \see cmm_split_dir()
529                  */
530                 rc = cmm_split_dir(env, mo_p);
531                 if (rc)
532                         /*
533                          * -ERESTART or some split error is returned, we can't
534                          * proceed with create.
535                          */
536                         GOTO(out, rc);
537         }
538
539         if (spec != NULL && spec->sp_ck_split) {
540                 /**
541                  * - Directory is split already. Let the caller know that
542                  * it should tell client that directory is split and operation
543                  * should repeat to correct MDT.
544                  * \see cmm_split_check()
545                  */
546                 rc = cmm_split_check(env, mo_p, lname->ln_name);
547                 if (rc)
548                         GOTO(out, rc);
549         }
550 #endif
551
552         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
553                         spec, ma);
554
555         EXIT;
556 #ifdef HAVE_SPLIT_SUPPORT
557 out:
558 #endif
559         return rc;
560 }
561
562 /** Call mdo_create_data() on next layer. All objects are local. */
563 static int cml_create_data(const struct lu_env *env, struct md_object *p,
564                            struct md_object *o,
565                            const struct md_op_spec *spec,
566                            struct md_attr *ma)
567 {
568         int rc;
569         ENTRY;
570         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
571                              spec, ma);
572         RETURN(rc);
573 }
574
575 /** Call mdo_link() on next layer. All objects are local. */
576 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
577                     struct md_object *mo_s, const struct lu_name *lname,
578                     struct md_attr *ma)
579 {
580         int rc;
581         ENTRY;
582         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
583                       lname, ma);
584         RETURN(rc);
585 }
586
587 /** Call mdo_unlink() on next layer. All objects are local. */
588 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
589                       struct md_object *mo_c, const struct lu_name *lname,
590                       struct md_attr *ma)
591 {
592         int rc;
593         ENTRY;
594         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
595                         lname, ma);
596         RETURN(rc);
597 }
598
599 /** Call mdo_lum_lmm_cmp() on next layer */
600 static int cml_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
601                            const struct md_op_spec *spec, struct md_attr *ma)
602 {
603         int rc;
604         ENTRY;
605
606         rc = mdo_lum_lmm_cmp(env, md_object_next(mo_c), spec, ma);
607         RETURN(rc);
608 }
609
610 /**
611  * \ingroup cmm
612  * Get mode of object.
613  * Used in both cml and cmr hence can produce RPC to another server.
614  */
615 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
616                         const struct lu_fid *lf, struct md_attr *ma,
617                         int *remote)
618 {
619         struct md_object *mo_s = md_object_find_slice(env, md, lf);
620         struct cmm_thread_info *cmi;
621         struct md_attr *tmp_ma;
622         int rc;
623         ENTRY;
624
625         if (IS_ERR(mo_s))
626                 RETURN(PTR_ERR(mo_s));
627
628         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
629                 *remote = 1;
630
631         cmi = cmm_env_info(env);
632         tmp_ma = &cmi->cmi_ma;
633         tmp_ma->ma_need = MA_INODE;
634         tmp_ma->ma_valid = 0;
635         /* get type from src, can be remote req */
636         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
637         if (rc == 0) {
638                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
639                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
640                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
641                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
642                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
643         }
644         lu_object_put(env, &mo_s->mo_lu);
645         RETURN(rc);
646 }
647
648 /**
649  * \ingroup cmm
650  * Set ctime for object.
651  * Used in both cml and cmr hence can produce RPC to another server.
652  */
653 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
654                             const struct lu_fid *lf, struct md_attr *ma)
655 {
656         struct md_object *mo_s = md_object_find_slice(env, md, lf);
657         int rc;
658         ENTRY;
659
660         if (IS_ERR(mo_s))
661                 RETURN(PTR_ERR(mo_s));
662
663         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
664         /* set ctime to obj, can be remote req */
665         rc = mo_attr_set(env, md_object_next(mo_s), ma);
666         lu_object_put(env, &mo_s->mo_lu);
667         RETURN(rc);
668 }
669
670 /** Helper to output debug information about rename operation. */
671 static inline void cml_rename_warn(const char *fname,
672                                   struct md_object *mo_po,
673                                   struct md_object *mo_pn,
674                                   const struct lu_fid *lf,
675                                   const char *s_name,
676                                   struct md_object *mo_t,
677                                   const char *t_name,
678                                   int err)
679 {
680         if (mo_t)
681                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
682                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
683                       "[tname %s] [err %d]\n", fname,
684                       PFID(lu_object_fid(&mo_po->mo_lu)),
685                       PFID(lu_object_fid(&mo_pn->mo_lu)),
686                       PFID(lf), s_name,
687                       PFID(lu_object_fid(&mo_t->mo_lu)),
688                       t_name, err);
689         else
690                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
691                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
692                       "[tname %s] [err %d]\n", fname,
693                       PFID(lu_object_fid(&mo_po->mo_lu)),
694                       PFID(lu_object_fid(&mo_pn->mo_lu)),
695                       PFID(lf), s_name,
696                       t_name, err);
697 }
698
699 /**
700  * Rename operation for cml.
701  *
702  * This is the most complex cross-reference operation. It may consist of up to 4
703  * MDS server and require several RPCs to be sent.
704  *
705  * \param mo_po Old parent object.
706  * \param mo_pn New parent object.
707  * \param lf FID of object to rename.
708  * \param ls_name Source file name.
709  * \param mo_t target object. Should be NULL here.
710  * \param lt_name Name of target file.
711  * \param ma object attributes.
712  */
713 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
714                       struct md_object *mo_pn, const struct lu_fid *lf,
715                       const struct lu_name *ls_name, struct md_object *mo_t,
716                       const struct lu_name *lt_name, struct md_attr *ma)
717 {
718         struct cmm_thread_info *cmi;
719         struct md_attr *tmp_ma = NULL;
720         struct md_object *tmp_t = mo_t;
721         int remote = 0, rc;
722         ENTRY;
723
724         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
725         if (rc)
726                 RETURN(rc);
727
728         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
729                 /**
730                  * \note \a mo_t is remote object and there is RPC to unlink it.
731                  * Before that, do local sanity check for rename first.
732                  */
733                 if (!remote) {
734                         struct md_object *mo_s = md_object_find_slice(env,
735                                                         md_obj2dev(mo_po), lf);
736                         if (IS_ERR(mo_s))
737                                 RETURN(PTR_ERR(mo_s));
738
739                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
740                         rc = mo_permission(env, md_object_next(mo_po),
741                                            md_object_next(mo_s),
742                                            ma, MAY_RENAME_SRC);
743                         lu_object_put(env, &mo_s->mo_lu);
744                         if (rc)
745                                 RETURN(rc);
746                 } else {
747                         rc = mo_permission(env, NULL, md_object_next(mo_po),
748                                            ma, MAY_UNLINK | MAY_VTX_FULL);
749                         if (rc)
750                                 RETURN(rc);
751                 }
752
753                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
754                                    MAY_UNLINK | MAY_VTX_PART);
755                 if (rc)
756                         RETURN(rc);
757
758                 /*
759                  * /note \a ma will be changed after mo_ref_del(), but we will use
760                  * it for mdo_rename() later, so save it before mo_ref_del().
761                  */
762                 cmi = cmm_env_info(env);
763                 tmp_ma = &cmi->cmi_ma;
764                 *tmp_ma = *ma;
765                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
766                 if (rc)
767                         RETURN(rc);
768
769                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
770                 mo_t = NULL;
771         }
772
773         /**
774          * \note for src on remote MDS case, change its ctime before local
775          * rename. Firstly, do local sanity check for rename if necessary.
776          */
777         if (remote) {
778                 if (!tmp_ma) {
779                         rc = mo_permission(env, NULL, md_object_next(mo_po),
780                                            ma, MAY_UNLINK | MAY_VTX_FULL);
781                         if (rc)
782                                 RETURN(rc);
783
784                         if (mo_t) {
785                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
786                                 rc = mo_permission(env, md_object_next(mo_pn),
787                                                    md_object_next(mo_t),
788                                                    ma, MAY_RENAME_TAR);
789                                 if (rc)
790                                         RETURN(rc);
791                         } else {
792                                 int mask;
793
794                                 if (mo_po != mo_pn)
795                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
796                                                 MAY_LINK : MAY_CREATE);
797                                 else
798                                         mask = MAY_CREATE;
799                                 rc = mo_permission(env, NULL,
800                                                    md_object_next(mo_pn),
801                                                    NULL, mask);
802                                 if (rc)
803                                         RETURN(rc);
804                         }
805
806                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
807                 } else {
808                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
809                 }
810
811                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
812                                       tmp_ma ? tmp_ma : ma);
813                 if (rc) {
814                         /* TODO: revoke mo_t if necessary. */
815                         cml_rename_warn("cmm_rename_ctime", mo_po,
816                                         mo_pn, lf, ls_name->ln_name,
817                                         tmp_t, lt_name->ln_name, rc);
818                         RETURN(rc);
819                 }
820         }
821
822         /* local rename, mo_t can be NULL */
823         rc = mdo_rename(env, md_object_next(mo_po),
824                         md_object_next(mo_pn), lf, ls_name,
825                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
826         if (rc)
827                 /* TODO: revoke all cml_rename */
828                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
829                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
830
831         RETURN(rc);
832 }
833
834 /**
835  * Rename target partial operation.
836  * Used for cross-ref rename.
837  */
838 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
839                           struct md_object *mo_t, const struct lu_fid *lf,
840                           const struct lu_name *lname, struct md_attr *ma)
841 {
842         int rc;
843         ENTRY;
844
845         rc = mdo_rename_tgt(env, md_object_next(mo_p),
846                             md_object_next(mo_t), lf, lname, ma);
847         RETURN(rc);
848 }
849
850 /**
851  * Name insert only operation.
852  * used only in case of rename_tgt() when target doesn't exist.
853  */
854 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
855                            const struct lu_name *lname, const struct lu_fid *lf,
856                            const struct md_attr *ma)
857 {
858         int rc;
859         ENTRY;
860
861         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
862
863         RETURN(rc);
864 }
865
866 /**
867  * \ingroup cmm
868  * Check two fids are not subdirectories.
869  */
870 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
871                          const struct lu_fid *fid, struct lu_fid *sfid)
872 {
873         struct cmm_thread_info *cmi;
874         int rc;
875         ENTRY;
876
877         cmi = cmm_env_info(env);
878         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
879         if (rc)
880                 RETURN(rc);
881
882         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
883                 RETURN(0);
884
885         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
886         RETURN(rc);
887 }
888
889 static const struct md_dir_operations cml_dir_ops = {
890         .mdo_is_subdir   = cmm_is_subdir,
891         .mdo_lookup      = cml_lookup,
892         .mdo_lock_mode   = cml_lock_mode,
893         .mdo_create      = cml_create,
894         .mdo_link        = cml_link,
895         .mdo_unlink      = cml_unlink,
896         .mdo_lum_lmm_cmp = cml_lum_lmm_cmp,
897         .mdo_name_insert = cml_name_insert,
898         .mdo_rename      = cml_rename,
899         .mdo_rename_tgt  = cml_rename_tgt,
900         .mdo_create_data = cml_create_data,
901 };
902 /** @} */
903 /** @} */
904
905 /**
906  * \addtogroup cmr
907  * @{
908  */
909 /**
910  * \name cmr helpers
911  * @{
912  */
913 /** Get cmr_object from lu_object. */
914 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
915 {
916         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
917 }
918 /** Get cmr_object from md_object. */
919 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
920 {
921         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
922 }
923 /** Get cmr_object from cmm_object. */
924 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
925 {
926         return container_of0(co, struct cmr_object, cmm_obj);
927 }
928 /** @} */
929
930 /**
931  * Get proper child device from MDCs.
932  */
933 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
934 {
935         struct lu_device *next = NULL;
936         struct mdc_device *mdc;
937
938         cfs_spin_lock(&d->cmm_tgt_guard);
939         cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
940                 if (mdc->mc_num == num) {
941                         next = mdc2lu_dev(mdc);
942                         break;
943                 }
944         }
945         cfs_spin_unlock(&d->cmm_tgt_guard);
946         return next;
947 }
948
949 /**
950  * Free cmr_object.
951  */
952 static void cmr_object_free(const struct lu_env *env,
953                             struct lu_object *lo)
954 {
955         struct cmr_object *cro = lu2cmr_obj(lo);
956         lu_object_fini(lo);
957         OBD_FREE_PTR(cro);
958 }
959
960 /**
961  * Initialize cmr object.
962  */
963 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
964                            const struct lu_object_conf *unused)
965 {
966         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
967         struct lu_device  *c_dev;
968         struct lu_object  *c_obj;
969         int rc;
970
971         ENTRY;
972
973         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
974         if (c_dev == NULL) {
975                 rc = -ENOENT;
976         } else {
977                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
978                                                         lo->lo_header, c_dev);
979                 if (c_obj != NULL) {
980                         lu_object_add(lo, c_obj);
981                         rc = 0;
982                 } else {
983                         rc = -ENOMEM;
984                 }
985         }
986
987         RETURN(rc);
988 }
989
990 /**
991  * Output lu_object data.
992  */
993 static int cmr_object_print(const struct lu_env *env, void *cookie,
994                             lu_printer_t p, const struct lu_object *lo)
995 {
996         const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
997         return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
998 }
999
1000 /**
1001  * Cmr instance of lu_object_operations.
1002  */
1003 static const struct lu_object_operations cmr_obj_ops = {
1004         .loo_object_init    = cmr_object_init,
1005         .loo_object_free    = cmr_object_free,
1006         .loo_object_print   = cmr_object_print
1007 };
1008
1009 /**
1010  * \name cmr remote md_object operations.
1011  * All operations here are invalid and return errors. There is no local object
1012  * so these operations return two kinds of error:
1013  * -# -EFAULT if operation is prohibited.
1014  * -# -EREMOTE if operation can be done just to notify upper level about remote
1015  *  object.
1016  *
1017  * @{
1018  */
1019 static int cmr_object_create(const struct lu_env *env,
1020                              struct md_object *mo,
1021                              const struct md_op_spec *spec,
1022                              struct md_attr *ma)
1023 {
1024         return -EFAULT;
1025 }
1026
1027 static int cmr_permission(const struct lu_env *env,
1028                           struct md_object *p, struct md_object *c,
1029                           struct md_attr *attr, int mask)
1030 {
1031         return -EREMOTE;
1032 }
1033
1034 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
1035                         struct md_attr *attr)
1036 {
1037         return -EREMOTE;
1038 }
1039
1040 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
1041                         const struct md_attr *attr)
1042 {
1043         return -EFAULT;
1044 }
1045
1046 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
1047                          struct lu_buf *buf, const char *name)
1048 {
1049         return -EFAULT;
1050 }
1051
1052 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
1053                         struct lu_buf *buf)
1054 {
1055         return -EFAULT;
1056 }
1057
1058 static int cmr_changelog(const struct lu_env *env, enum changelog_rec_type type,
1059                          int flags, struct md_object *mo)
1060 {
1061         return -EFAULT;
1062 }
1063
1064 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
1065                           struct lu_buf *buf)
1066 {
1067         return -EFAULT;
1068 }
1069
1070 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
1071                          const struct lu_buf *buf, const char *name,
1072                          int fl)
1073 {
1074         return -EFAULT;
1075 }
1076
1077 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
1078                          const char *name)
1079 {
1080         return -EFAULT;
1081 }
1082
1083 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
1084                        const struct md_attr *ma)
1085 {
1086         return -EFAULT;
1087 }
1088
1089 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
1090                        struct md_attr *ma)
1091 {
1092         return -EFAULT;
1093 }
1094
1095 static int cmr_open(const struct lu_env *env, struct md_object *mo,
1096                     int flags)
1097 {
1098         return -EREMOTE;
1099 }
1100
1101 static int cmr_close(const struct lu_env *env, struct md_object *mo,
1102                      struct md_attr *ma, int mode)
1103 {
1104         return -EFAULT;
1105 }
1106
1107 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
1108                         const struct lu_rdpg *rdpg)
1109 {
1110         return -EREMOTE;
1111 }
1112
1113 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
1114                         struct lustre_capa *capa, int renewal)
1115 {
1116         return -EFAULT;
1117 }
1118
1119 static int cmr_path(const struct lu_env *env, struct md_object *obj,
1120                     char *path, int pathlen, __u64 *recno, int *linkno)
1121 {
1122         return -EREMOTE;
1123 }
1124
1125 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
1126 {
1127         return -EFAULT;
1128 }
1129
1130 static int cmr_file_lock(const struct lu_env *env, struct md_object *mo,
1131                          struct lov_mds_md *lmm, struct ldlm_extent *extent,
1132                          struct lustre_handle *lockh)
1133 {
1134         return -EREMOTE;
1135 }
1136
1137 static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo,
1138                            struct lov_mds_md *lmm, struct lustre_handle *lockh)
1139 {
1140         return -EREMOTE;
1141 }
1142
1143 static int cmr_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
1144                            const struct md_op_spec *spec, struct md_attr *ma)
1145 {
1146         return -EREMOTE;
1147 }
1148
1149 /** Set of md_object_operations for cmr. */
1150 static const struct md_object_operations cmr_mo_ops = {
1151         .moo_permission    = cmr_permission,
1152         .moo_attr_get      = cmr_attr_get,
1153         .moo_attr_set      = cmr_attr_set,
1154         .moo_xattr_get     = cmr_xattr_get,
1155         .moo_xattr_set     = cmr_xattr_set,
1156         .moo_xattr_list    = cmr_xattr_list,
1157         .moo_xattr_del     = cmr_xattr_del,
1158         .moo_object_create = cmr_object_create,
1159         .moo_ref_add       = cmr_ref_add,
1160         .moo_ref_del       = cmr_ref_del,
1161         .moo_open          = cmr_open,
1162         .moo_close         = cmr_close,
1163         .moo_readpage      = cmr_readpage,
1164         .moo_readlink      = cmr_readlink,
1165         .moo_changelog     = cmr_changelog,
1166         .moo_capa_get      = cmr_capa_get,
1167         .moo_object_sync   = cmr_object_sync,
1168         .moo_path          = cmr_path,
1169         .moo_file_lock     = cmr_file_lock,
1170         .moo_file_unlock   = cmr_file_unlock,
1171 };
1172 /** @} */
1173
1174 /**
1175  * \name cmr md_dir operations.
1176  *
1177  * All methods below are cross-ref by nature. They consist of remote call and
1178  * local operation. Due to future rollback functionality there are several
1179  * limitations for such methods:
1180  * -# remote call should be done at first to do epoch negotiation between all
1181  * MDS involved and to avoid the RPC inside transaction.
1182  * -# only one RPC can be sent - also due to epoch negotiation.
1183  * For more details see rollback HLD/DLD.
1184  * @{
1185  */
1186 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1187                       const struct lu_name *lname, struct lu_fid *lf,
1188                       struct md_op_spec *spec)
1189 {
1190         /*
1191          * This can happens while rename() If new parent is remote dir, lookup
1192          * will happen here.
1193          */
1194
1195         return -EREMOTE;
1196 }
1197
1198 /** Return lock mode. */
1199 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1200                                 struct md_object *mo, mdl_mode_t lm)
1201 {
1202         return MDL_MINMODE;
1203 }
1204
1205 /**
1206  * Create operation for cmr.
1207  * Remote object creation and local name insert.
1208  *
1209  * \param mo_p Parent directory. Local object.
1210  * \param lchild_name name of file to create.
1211  * \param mo_c Child object. It has no real inode yet.
1212  * \param spec creation specification.
1213  * \param ma child object attributes.
1214  */
1215 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1216                       const struct lu_name *lchild_name, struct md_object *mo_c,
1217                       struct md_op_spec *spec,
1218                       struct md_attr *ma)
1219 {
1220         struct cmm_thread_info *cmi;
1221         struct md_attr *tmp_ma;
1222         int rc;
1223         ENTRY;
1224
1225         /* Make sure that name isn't exist before doing remote call. */
1226         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1227                         &cmm_env_info(env)->cmi_fid, NULL);
1228         if (rc == 0)
1229                 RETURN(-EEXIST);
1230         else if (rc != -ENOENT)
1231                 RETURN(rc);
1232
1233         /* check the SGID attr */
1234         cmi = cmm_env_info(env);
1235         LASSERT(cmi);
1236         tmp_ma = &cmi->cmi_ma;
1237         tmp_ma->ma_valid = 0;
1238         tmp_ma->ma_need = MA_INODE;
1239
1240 #ifdef CONFIG_FS_POSIX_ACL
1241         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1242                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1243                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1244                 tmp_ma->ma_need |= MA_ACL_DEF;
1245         }
1246 #endif
1247         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1248         if (rc)
1249                 RETURN(rc);
1250
1251         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1252                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1253                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1254                         ma->ma_attr.la_mode |= S_ISGID;
1255                         ma->ma_attr.la_valid |= LA_MODE;
1256                 }
1257         }
1258
1259 #ifdef CONFIG_FS_POSIX_ACL
1260         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1261                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1262                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1263                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1264                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1265         }
1266 #endif
1267
1268         /* Local permission check for name_insert before remote ops. */
1269         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1270                            (S_ISDIR(ma->ma_attr.la_mode) ?
1271                            MAY_LINK : MAY_CREATE));
1272         if (rc)
1273                 RETURN(rc);
1274
1275         /**
1276          * \note \a ma will be changed after mo_object_create(), but we will use
1277          * it for mdo_name_insert() later, so save it before mo_object_create().
1278          */
1279         *tmp_ma = *ma;
1280         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1281         if (rc == 0) {
1282                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1283                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1284                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1285                 if (unlikely(rc)) {
1286                         /* TODO: remove object mo_c on remote MDS */
1287                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1288                               " [name %s] [mo_c "DFID"] [err %d]\n",
1289                               PFID(lu_object_fid(&mo_p->mo_lu)),
1290                               lchild_name->ln_name,
1291                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1292                 }
1293         }
1294
1295         RETURN(rc);
1296 }
1297
1298 /**
1299  * Link operations for cmr.
1300  *
1301  * The link RPC is always issued to the server where source parent is living.
1302  * The first operation to do is object nlink increment on remote server.
1303  * Second one is local mdo_name_insert().
1304  *
1305  * \param mo_p parent directory. It is local.
1306  * \param mo_s source object to link. It is remote.
1307  * \param lname Name of link file.
1308  * \param ma object attributes.
1309  */
1310 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1311                     struct md_object *mo_s, const struct lu_name *lname,
1312                     struct md_attr *ma)
1313 {
1314         int rc;
1315         ENTRY;
1316
1317         /* Make sure that name isn't exist before doing remote call. */
1318         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1319                         &cmm_env_info(env)->cmi_fid, NULL);
1320         if (rc == 0) {
1321                 rc = -EEXIST;
1322         } else if (rc == -ENOENT) {
1323                 /* Local permission check for name_insert before remote ops. */
1324                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1325                                    MAY_CREATE);
1326                 if (rc)
1327                         RETURN(rc);
1328
1329                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1330                 if (rc == 0) {
1331                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1332                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1333                                              lu_object_fid(&mo_s->mo_lu), ma);
1334                         if (unlikely(rc)) {
1335                                 /* TODO: ref_del from mo_s on remote MDS */
1336                                 CWARN("cmr_link failed, should revoke: "
1337                                       "[mo_p "DFID"] [mo_s "DFID"] "
1338                                       "[name %s] [err %d]\n",
1339                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1340                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1341                                       lname->ln_name, rc);
1342                         }
1343                 }
1344         }
1345         RETURN(rc);
1346 }
1347
1348 /**
1349  * Unlink operations for cmr.
1350  *
1351  * The unlink RPC is always issued to the server where parent is living. Hence
1352  * the first operation to do is object unlink on remote server. Second one is
1353  * local mdo_name_remove().
1354  *
1355  * \param mo_p parent md_object. It is local.
1356  * \param mo_c child object to be unlinked. It is remote.
1357  * \param lname Name of file to unlink.
1358  * \param ma object attributes.
1359  */
1360 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1361                       struct md_object *mo_c, const struct lu_name *lname,
1362                       struct md_attr *ma)
1363 {
1364         struct cmm_thread_info *cmi;
1365         struct md_attr *tmp_ma;
1366         int rc;
1367         ENTRY;
1368
1369         /* Local permission check for name_remove before remote ops. */
1370         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1371                            MAY_UNLINK | MAY_VTX_PART);
1372         if (rc)
1373                 RETURN(rc);
1374
1375         /*
1376          * \note \a ma will be changed after mo_ref_del, but we will use
1377          * it for mdo_name_remove() later, so save it before mo_ref_del().
1378          */
1379         cmi = cmm_env_info(env);
1380         tmp_ma = &cmi->cmi_ma;
1381         *tmp_ma = *ma;
1382         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1383         if (rc == 0) {
1384                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1385                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1386                 if (unlikely(rc)) {
1387                         /* TODO: ref_add to mo_c on remote MDS */
1388                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1389                               " [mo_c "DFID"] [name %s] [err %d]\n",
1390                               PFID(lu_object_fid(&mo_p->mo_lu)),
1391                               PFID(lu_object_fid(&mo_c->mo_lu)),
1392                               lname->ln_name, rc);
1393                 }
1394         }
1395
1396         RETURN(rc);
1397 }
1398
1399 /** Helper which outputs error message during cmr_rename() */
1400 static inline void cmr_rename_warn(const char *fname,
1401                                   struct md_object *mo_po,
1402                                   struct md_object *mo_pn,
1403                                   const struct lu_fid *lf,
1404                                   const char *s_name,
1405                                   const char *t_name,
1406                                   int err)
1407 {
1408         CWARN("cmr_rename failed for %s, should revoke: "
1409               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1410               "[sname %s] [tname %s] [err %d]\n", fname,
1411               PFID(lu_object_fid(&mo_po->mo_lu)),
1412               PFID(lu_object_fid(&mo_pn->mo_lu)),
1413               PFID(lf), s_name, t_name, err);
1414 }
1415
1416 /**
1417  * Rename operation for cmr.
1418  *
1419  * This is the most complex cross-reference operation. It may consist of up to 4
1420  * MDS server and require several RPCs to be sent.
1421  *
1422  * \param mo_po Old parent object.
1423  * \param mo_pn New parent object.
1424  * \param lf FID of object to rename.
1425  * \param ls_name Source file name.
1426  * \param mo_t target object. Should be NULL here.
1427  * \param lt_name Name of target file.
1428  * \param ma object attributes.
1429  */
1430 static int cmr_rename(const struct lu_env *env,
1431                       struct md_object *mo_po, struct md_object *mo_pn,
1432                       const struct lu_fid *lf, const struct lu_name *ls_name,
1433                       struct md_object *mo_t, const struct lu_name *lt_name,
1434                       struct md_attr *ma)
1435 {
1436         struct cmm_thread_info *cmi;
1437         struct md_attr *tmp_ma;
1438         int rc;
1439         ENTRY;
1440
1441         LASSERT(mo_t == NULL);
1442
1443         /* get real type of src */
1444         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1445         if (rc)
1446                 RETURN(rc);
1447
1448         /* Local permission check for name_remove before remote ops. */
1449         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1450                            MAY_UNLINK | MAY_VTX_FULL);
1451         if (rc)
1452                 RETURN(rc);
1453
1454         /**
1455          * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
1456          * for mdo_name_remove() later, so save it before mdo_rename_tgt.
1457          */
1458         cmi = cmm_env_info(env);
1459         tmp_ma = &cmi->cmi_ma;
1460         *tmp_ma = *ma;
1461         /**
1462          * \note The \a mo_pn is remote directory, so we cannot even know if there is
1463          * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
1464          * lookup and process this further.
1465          */
1466         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1467                             NULL/* mo_t */, lf, lt_name, ma);
1468         if (rc)
1469                 RETURN(rc);
1470
1471         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1472
1473         /* src object maybe on remote MDS, do remote ops first. */
1474         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1475         if (unlikely(rc)) {
1476                 /* TODO: revoke mdo_rename_tgt */
1477                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1478                                 ls_name->ln_name, lt_name->ln_name, rc);
1479                 RETURN(rc);
1480         }
1481
1482         /* only old name is removed localy */
1483         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1484         if (unlikely(rc))
1485                 /* TODO: revoke all cmr_rename */
1486                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1487                                 ls_name->ln_name, lt_name->ln_name, rc);
1488
1489         RETURN(rc);
1490 }
1491
1492 /**
1493  * Part of cross-ref rename().
1494  * Used to insert new name in new parent and unlink target.
1495  */
1496 static int cmr_rename_tgt(const struct lu_env *env,
1497                           struct md_object *mo_p, struct md_object *mo_t,
1498                           const struct lu_fid *lf, const struct lu_name *lname,
1499                           struct md_attr *ma)
1500 {
1501         struct cmm_thread_info *cmi;
1502         struct md_attr *tmp_ma;
1503         int rc;
1504         ENTRY;
1505
1506         /* target object is remote one */
1507         /* Local permission check for rename_tgt before remote ops. */
1508         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1509                            MAY_UNLINK | MAY_VTX_PART);
1510         if (rc)
1511                 RETURN(rc);
1512
1513         /*
1514          * XXX: @ma maybe changed after mo_ref_del, but we will use
1515          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1516          */
1517         cmi = cmm_env_info(env);
1518         tmp_ma = &cmi->cmi_ma;
1519         *tmp_ma = *ma;
1520         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1521         /* continue locally with name handling only */
1522         if (rc == 0) {
1523                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1524                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1525                                     NULL, lf, lname, tmp_ma);
1526                 if (unlikely(rc)) {
1527                         /* TODO: ref_add to mo_t on remote MDS */
1528                         CWARN("cmr_rename_tgt failed, should revoke: "
1529                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1530                               "[name %s] [err %d]\n",
1531                               PFID(lu_object_fid(&mo_p->mo_lu)),
1532                               PFID(lu_object_fid(&mo_t->mo_lu)),
1533                               PFID(lf),
1534                               lname->ln_name, rc);
1535                 }
1536         }
1537         RETURN(rc);
1538 }
1539 /** @} */
1540 /**
1541  * The md_dir_operations for cmr.
1542  */
1543 static const struct md_dir_operations cmr_dir_ops = {
1544         .mdo_is_subdir   = cmm_is_subdir,
1545         .mdo_lookup      = cmr_lookup,
1546         .mdo_lock_mode   = cmr_lock_mode,
1547         .mdo_create      = cmr_create,
1548         .mdo_link        = cmr_link,
1549         .mdo_unlink      = cmr_unlink,
1550         .mdo_lum_lmm_cmp = cmr_lum_lmm_cmp,
1551         .mdo_rename      = cmr_rename,
1552         .mdo_rename_tgt  = cmr_rename_tgt
1553 };
1554 /** @} */