Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_object.c
5  *  Lustre Cluster Metadata Manager (cmm)
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Mike Pershin <tappro@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_fid.h>
36 #include "cmm_internal.h"
37 #include "mdc_internal.h"
38
39 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
40                    mdsno_t *mds, const struct lu_env *env)
41 {
42         int rc = 0;
43         ENTRY;
44
45         LASSERT(fid_is_sane(fid));
46
47         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
48         if (rc) {
49                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
50                        fid_seq(fid), rc);
51                 RETURN(rc);
52         }
53
54         if (*mds > cm->cmm_tgt_count) {
55                 CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
56                        *mds, cm->cmm_tgt_count);
57                 rc = -EINVAL;
58         } else {
59                 CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
60                        LPU64"\n", *mds, fid_seq(fid));
61         }
62
63         RETURN (rc);
64 }
65
66 static struct md_object_operations cml_mo_ops;
67 static struct md_dir_operations    cml_dir_ops;
68 static struct lu_object_operations cml_obj_ops;
69
70 static struct md_object_operations cmr_mo_ops;
71 static struct md_dir_operations    cmr_dir_ops;
72 static struct lu_object_operations cmr_obj_ops;
73
74 struct lu_object *cmm_object_alloc(const struct lu_env *env,
75                                    const struct lu_object_header *loh,
76                                    struct lu_device *ld)
77 {
78         const struct lu_fid *fid = &loh->loh_fid;
79         struct lu_object  *lo = NULL;
80         struct cmm_device *cd;
81         mdsno_t mds;
82         int rc = 0;
83
84         ENTRY;
85
86         cd = lu2cmm_dev(ld);
87         if (cd->cmm_flags & CMM_INITIALIZED) {
88                 /* get object location */
89                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
90                 if (rc)
91                         RETURN(NULL);
92         } else
93                 /*
94                  * Device is not yet initialized, cmm_object is being created
95                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
96                  * etc.). Such object *has* to be local.
97                  */
98                 mds = cd->cmm_local_num;
99
100         /* select the proper set of operations based on object location */
101         if (mds == cd->cmm_local_num) {
102                 struct cml_object *clo;
103
104                 OBD_ALLOC_PTR(clo);
105                 if (clo != NULL) {
106                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
107                         lu_object_init(lo, NULL, ld);
108                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
109                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
110                         lo->lo_ops = &cml_obj_ops;
111                 }
112         } else {
113                 struct cmr_object *cro;
114
115                 OBD_ALLOC_PTR(cro);
116                 if (cro != NULL) {
117                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
118                         lu_object_init(lo, NULL, ld);
119                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
120                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
121                         lo->lo_ops = &cmr_obj_ops;
122                         cro->cmo_num = mds;
123                 }
124         }
125         RETURN(lo);
126 }
127
128 /*
129  * CMM has two types of objects - local and remote. They have different set
130  * of operations so we are avoiding multiple checks in code.
131  */
132
133 /* get local child device */
134 static struct lu_device *cml_child_dev(struct cmm_device *d)
135 {
136         return &d->cmm_child->md_lu_dev;
137 }
138
139 /* lu_object operations */
140 static void cml_object_free(const struct lu_env *env,
141                             struct lu_object *lo)
142 {
143         struct cml_object *clo = lu2cml_obj(lo);
144         lu_object_fini(lo);
145         OBD_FREE_PTR(clo);
146 }
147
148 static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
149 {
150         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
151         struct lu_device  *c_dev;
152         struct lu_object  *c_obj;
153         int rc;
154
155         ENTRY;
156
157 #ifdef HAVE_SPLIT_SUPPORT
158         if (cd->cmm_tgt_count == 0)
159                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
160         else
161                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
162 #endif
163         c_dev = cml_child_dev(cd);
164         if (c_dev == NULL) {
165                 rc = -ENOENT;
166         } else {
167                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
168                                                         lo->lo_header, c_dev);
169                 if (c_obj != NULL) {
170                         lu_object_add(lo, c_obj);
171                         rc = 0;
172                 } else {
173                         rc = -ENOMEM;
174                 }
175         }
176
177         RETURN(rc);
178 }
179
180 static int cml_object_print(const struct lu_env *env, void *cookie,
181                             lu_printer_t p, const struct lu_object *lo)
182 {
183         return (*p)(env, cookie, LUSTRE_CMM_NAME"-local@%p", lo);
184 }
185
186 static struct lu_object_operations cml_obj_ops = {
187         .loo_object_init    = cml_object_init,
188         .loo_object_free    = cml_object_free,
189         .loo_object_print   = cml_object_print
190 };
191
192 /* CMM local md_object operations */
193 static int cml_object_create(const struct lu_env *env,
194                              struct md_object *mo,
195                              const struct md_op_spec *spec,
196                              struct md_attr *attr)
197 {
198         int rc;
199         ENTRY;
200         rc = mo_object_create(env, md_object_next(mo), spec, attr);
201         RETURN(rc);
202 }
203
204 static int cml_permission(const struct lu_env *env,
205                           struct md_object *p, struct md_object *c,
206                           struct md_attr *attr, int mask)
207 {
208         int rc;
209         ENTRY;
210         rc = mo_permission(env, md_object_next(p), md_object_next(c),
211                            attr, mask);
212         RETURN(rc);
213 }
214
215 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
216                         struct md_attr *attr)
217 {
218         int rc;
219         ENTRY;
220         rc = mo_attr_get(env, md_object_next(mo), attr);
221         RETURN(rc);
222 }
223
224 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
225                         const struct md_attr *attr)
226 {
227         int rc;
228         ENTRY;
229         rc = mo_attr_set(env, md_object_next(mo), attr);
230         RETURN(rc);
231 }
232
233 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
234                          struct lu_buf *buf, const char *name)
235 {
236         int rc;
237         ENTRY;
238         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
239         RETURN(rc);
240 }
241
242 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
243                         struct lu_buf *buf)
244 {
245         int rc;
246         ENTRY;
247         rc = mo_readlink(env, md_object_next(mo), buf);
248         RETURN(rc);
249 }
250
251 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
252                           struct lu_buf *buf)
253 {
254         int rc;
255         ENTRY;
256         rc = mo_xattr_list(env, md_object_next(mo), buf);
257         RETURN(rc);
258 }
259
260 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
261                          const struct lu_buf *buf,
262                          const char *name, int fl)
263 {
264         int rc;
265         ENTRY;
266         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
267         RETURN(rc);
268 }
269
270 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
271                          const char *name)
272 {
273         int rc;
274         ENTRY;
275         rc = mo_xattr_del(env, md_object_next(mo), name);
276         RETURN(rc);
277 }
278
279 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
280                        const struct md_attr *ma)
281 {
282         int rc;
283         ENTRY;
284         rc = mo_ref_add(env, md_object_next(mo), ma);
285         RETURN(rc);
286 }
287
288 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
289                        struct md_attr *ma)
290 {
291         int rc;
292         ENTRY;
293         rc = mo_ref_del(env, md_object_next(mo), ma);
294         RETURN(rc);
295 }
296
297 static int cml_open(const struct lu_env *env, struct md_object *mo,
298                     int flags)
299 {
300         int rc;
301         ENTRY;
302         rc = mo_open(env, md_object_next(mo), flags);
303         RETURN(rc);
304 }
305
306 static int cml_close(const struct lu_env *env, struct md_object *mo,
307                      struct md_attr *ma)
308 {
309         int rc;
310         ENTRY;
311         rc = mo_close(env, md_object_next(mo), ma);
312         RETURN(rc);
313 }
314
315 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
316                         const struct lu_rdpg *rdpg)
317 {
318         int rc;
319         ENTRY;
320         rc = mo_readpage(env, md_object_next(mo), rdpg);
321         RETURN(rc);
322 }
323
324 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
325                         struct lustre_capa *capa, int renewal)
326 {
327         int rc;
328         ENTRY;
329         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
330         RETURN(rc);
331 }
332
333 static struct md_object_operations cml_mo_ops = {
334         .moo_permission    = cml_permission,
335         .moo_attr_get      = cml_attr_get,
336         .moo_attr_set      = cml_attr_set,
337         .moo_xattr_get     = cml_xattr_get,
338         .moo_xattr_list    = cml_xattr_list,
339         .moo_xattr_set     = cml_xattr_set,
340         .moo_xattr_del     = cml_xattr_del,
341         .moo_object_create = cml_object_create,
342         .moo_ref_add       = cml_ref_add,
343         .moo_ref_del       = cml_ref_del,
344         .moo_open          = cml_open,
345         .moo_close         = cml_close,
346         .moo_readpage      = cml_readpage,
347         .moo_readlink      = cml_readlink,
348         .moo_capa_get      = cml_capa_get
349 };
350
351 /* md_dir operations */
352 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
353                       const struct lu_name *lname, struct lu_fid *lf,
354                       struct md_op_spec *spec)
355 {
356         int rc;
357         ENTRY;
358
359 #ifdef HAVE_SPLIT_SUPPORT
360         if (spec != NULL && spec->sp_ck_split) {
361                 rc = cmm_split_check(env, mo_p, lname->ln_name);
362                 if (rc)
363                         RETURN(rc);
364         }
365 #endif
366         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
367         RETURN(rc);
368
369 }
370
371 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
372                                 struct md_object *mo, mdl_mode_t lm)
373 {
374         int rc = MDL_MINMODE;
375         ENTRY;
376
377 #ifdef HAVE_SPLIT_SUPPORT
378         rc = cmm_split_access(env, mo, lm);
379 #endif
380
381         RETURN(rc);
382 }
383
384 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
385                       const struct lu_name *lname, struct md_object *mo_c,
386                       struct md_op_spec *spec, struct md_attr *ma)
387 {
388         int rc;
389         ENTRY;
390
391 #ifdef HAVE_SPLIT_SUPPORT
392         /* Lock mode always should be sane. */
393         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
394
395         /*
396          * Sigh... This is long story. MDT may have race with detecting if split
397          * is possible in cmm. We know this race and let it live, because
398          * getting it rid (with some sem or spinlock) will also mean that
399          * PDIROPS for create will not work because we kill parallel work, what
400          * is really bad for performance and makes no sense having PDIROPS. So,
401          * we better allow the race to live, but split dir only if some of
402          * concurrent threads takes EX lock, not matter which one. So that, say,
403          * two concurrent threads may have different lock modes on directory (CW
404          * and EX) and not first one which comes here and see that split is
405          * possible should split the dir, but only that one which has EX
406          * lock. And we do not care that in this case, split may happen a bit
407          * later (when dir size will not be necessarily 64K, but may be a bit
408          * larger). So that, we allow concurrent creates and protect split by EX
409          * lock.
410          */
411         if (spec->sp_cr_mode == MDL_EX) {
412                 /*
413                  * Try to split @mo_p. If split is ok, -ERESTART is returned and
414                  * current thread will not peoceed with create. Instead it sends
415                  * -ERESTART to client to let it know that correct MDT should be
416                  * choosen.
417                  */
418                 rc = cmm_split_dir(env, mo_p);
419                 if (rc)
420                         /*
421                          * -ERESTART or some split error is returned, we can't
422                          * proceed with create.
423                          */
424                         GOTO(out, rc);
425         }
426
427         if (spec != NULL && spec->sp_ck_split) {
428                 /*
429                  * Check for possible split directory and let caller know that
430                  * it should tell client that directory is split and operation
431                  * should repeat to correct MDT.
432                  */
433                 rc = cmm_split_check(env, mo_p, lname->ln_name);
434                 if (rc)
435                         GOTO(out, rc);
436         }
437 #endif
438
439         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
440                         spec, ma);
441
442         EXIT;
443 #ifdef HAVE_SPLIT_SUPPORT
444 out:
445 #endif
446         return rc;
447 }
448
449 static int cml_create_data(const struct lu_env *env, struct md_object *p,
450                            struct md_object *o,
451                            const struct md_op_spec *spec,
452                            struct md_attr *ma)
453 {
454         int rc;
455         ENTRY;
456         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
457                              spec, ma);
458         RETURN(rc);
459 }
460
461 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
462                     struct md_object *mo_s, const struct lu_name *lname,
463                     struct md_attr *ma)
464 {
465         int rc;
466         ENTRY;
467         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
468                       lname, ma);
469         RETURN(rc);
470 }
471
472 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
473                       struct md_object *mo_c, const struct lu_name *lname,
474                       struct md_attr *ma)
475 {
476         int rc;
477         ENTRY;
478         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
479                         lname, ma);
480         RETURN(rc);
481 }
482
483 /* rename is split to local/remote by location of new parent dir */
484 struct md_object *md_object_find(const struct lu_env *env,
485                                  struct md_device *md,
486                                  const struct lu_fid *f)
487 {
488         struct lu_object *o;
489         struct md_object *m;
490         ENTRY;
491
492         o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
493         if (IS_ERR(o))
494                 m = (struct md_object *)o;
495         else {
496                 o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
497                 m = o ? lu2md(o) : NULL;
498         }
499         RETURN(m);
500 }
501
502 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
503                         const struct lu_fid *lf, struct md_attr *ma,
504                         int *remote)
505 {
506         struct md_object *mo_s = md_object_find(env, md, lf);
507         struct cmm_thread_info *cmi;
508         struct md_attr *tmp_ma;
509         int rc;
510         ENTRY;
511
512         if (IS_ERR(mo_s))
513                 RETURN(PTR_ERR(mo_s));
514
515         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
516                 *remote = 1;
517
518         cmi = cmm_env_info(env);
519         tmp_ma = &cmi->cmi_ma;
520         tmp_ma->ma_need = MA_INODE;
521         tmp_ma->ma_valid = 0;
522         /* get type from src, can be remote req */
523         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
524         if (rc == 0) {
525                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
526                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
527                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
528                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
529                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
530         }
531         lu_object_put(env, &mo_s->mo_lu);
532         RETURN(rc);
533 }
534
535 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
536                             const struct lu_fid *lf, struct md_attr *ma)
537 {
538         struct md_object *mo_s = md_object_find(env, md, lf);
539         int rc;
540         ENTRY;
541
542         if (IS_ERR(mo_s))
543                 RETURN(PTR_ERR(mo_s));
544
545         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
546         /* set ctime to obj, can be remote req */
547         rc = mo_attr_set(env, md_object_next(mo_s), ma);
548         lu_object_put(env, &mo_s->mo_lu);
549         RETURN(rc);
550 }
551
552 static inline void cml_rename_warn(const char *fname,
553                                   struct md_object *mo_po,
554                                   struct md_object *mo_pn,
555                                   const struct lu_fid *lf,
556                                   const char *s_name,
557                                   struct md_object *mo_t,
558                                   const char *t_name,
559                                   int err)
560 {
561         if (mo_t)
562                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
563                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
564                       "[tname %s] [err %d]\n", fname,
565                       PFID(lu_object_fid(&mo_po->mo_lu)),
566                       PFID(lu_object_fid(&mo_pn->mo_lu)),
567                       PFID(lf), s_name,
568                       PFID(lu_object_fid(&mo_t->mo_lu)),
569                       t_name, err);
570         else
571                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
572                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
573                       "[tname %s] [err %d]\n", fname,
574                       PFID(lu_object_fid(&mo_po->mo_lu)),
575                       PFID(lu_object_fid(&mo_pn->mo_lu)),
576                       PFID(lf), s_name,
577                       t_name, err);
578 }
579
580 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
581                       struct md_object *mo_pn, const struct lu_fid *lf,
582                       const struct lu_name *ls_name, struct md_object *mo_t,
583                       const struct lu_name *lt_name, struct md_attr *ma)
584 {
585         struct cmm_thread_info *cmi;
586         struct md_attr *tmp_ma = NULL;
587         struct md_object *tmp_t = mo_t;
588         int remote = 0, rc;
589         ENTRY;
590
591         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
592         if (rc)
593                 RETURN(rc);
594
595         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
596                 /* XXX: mo_t is remote object and there is RPC to unlink it.
597                  * before that, do local sanity check for rename first. */
598                 if (!remote) {
599                         struct md_object *mo_s = md_object_find(env,
600                                                         md_obj2dev(mo_po), lf);
601                         if (IS_ERR(mo_s))
602                                 RETURN(PTR_ERR(mo_s));
603
604                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
605                         rc = mo_permission(env, md_object_next(mo_po),
606                                            md_object_next(mo_s),
607                                            ma, MAY_RENAME_SRC);
608                         lu_object_put(env, &mo_s->mo_lu);
609                         if (rc)
610                                 RETURN(rc);
611                 } else {
612                         rc = mo_permission(env, NULL, md_object_next(mo_po),
613                                            ma, MAY_UNLINK | MAY_VTX_FULL);
614                         if (rc)
615                                 RETURN(rc);
616                 }
617
618                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
619                                    MAY_UNLINK | MAY_VTX_PART);
620                 if (rc)
621                         RETURN(rc);
622
623                 /*
624                  * XXX: @ma will be changed after mo_ref_del, but we will use
625                  * it for mdo_rename later, so save it before mo_ref_del.
626                  */
627                 cmi = cmm_env_info(env);
628                 tmp_ma = &cmi->cmi_ma;
629                 *tmp_ma = *ma;
630                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
631                 if (rc)
632                         RETURN(rc);
633
634                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
635                 mo_t = NULL;
636         }
637
638         /* XXX: for src on remote MDS case, change its ctime before local
639          * rename. Firstly, do local sanity check for rename if necessary. */
640         if (remote) {
641                 if (!tmp_ma) {
642                         rc = mo_permission(env, NULL, md_object_next(mo_po),
643                                            ma, MAY_UNLINK | MAY_VTX_FULL);
644                         if (rc)
645                                 RETURN(rc);
646
647                         if (mo_t) {
648                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
649                                 rc = mo_permission(env, md_object_next(mo_pn),
650                                                    md_object_next(mo_t),
651                                                    ma, MAY_RENAME_TAR);
652                                 if (rc)
653                                         RETURN(rc);
654                         } else {
655                                 int mask;
656
657                                 if (mo_po != mo_pn)
658                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
659                                                 MAY_LINK : MAY_CREATE);
660                                 else
661                                         mask = MAY_CREATE;
662                                 rc = mo_permission(env, NULL,
663                                                    md_object_next(mo_pn),
664                                                    NULL, mask);
665                                 if (rc)
666                                         RETURN(rc);
667                         }
668
669                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
670                 } else {
671                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
672                 }
673
674                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
675                                       tmp_ma ? tmp_ma : ma);
676                 if (rc) {
677                         /* TODO: revoke mo_t if necessary. */
678                         cml_rename_warn("cmm_rename_ctime", mo_po,
679                                         mo_pn, lf, ls_name->ln_name,
680                                         tmp_t, lt_name->ln_name, rc);
681                         RETURN(rc);
682                 }
683         }
684
685         /* local rename, mo_t can be NULL */
686         rc = mdo_rename(env, md_object_next(mo_po),
687                         md_object_next(mo_pn), lf, ls_name,
688                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
689         if (rc)
690                 /* TODO: revoke all cml_rename */
691                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
692                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
693
694         RETURN(rc);
695 }
696
697 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
698                           struct md_object *mo_t, const struct lu_fid *lf,
699                           const struct lu_name *lname, struct md_attr *ma)
700 {
701         int rc;
702         ENTRY;
703
704         rc = mdo_rename_tgt(env, md_object_next(mo_p),
705                             md_object_next(mo_t), lf, lname, ma);
706         RETURN(rc);
707 }
708 /* used only in case of rename_tgt() when target is not exist */
709 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
710                            const struct lu_name *lname, const struct lu_fid *lf,
711                            const struct md_attr *ma)
712 {
713         int rc;
714         ENTRY;
715
716         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
717
718         RETURN(rc);
719 }
720
721 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
722                          const struct lu_fid *fid, struct lu_fid *sfid)
723 {
724         struct cmm_thread_info *cmi;
725         int rc;
726         ENTRY;
727
728         cmi = cmm_env_info(env);
729         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
730         if (rc)
731                 RETURN(rc);
732
733         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
734                 RETURN(0);
735
736         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
737         RETURN(rc);
738 }
739
740 static struct md_dir_operations cml_dir_ops = {
741         .mdo_is_subdir   = cmm_is_subdir,
742         .mdo_lookup      = cml_lookup,
743         .mdo_lock_mode   = cml_lock_mode,
744         .mdo_create      = cml_create,
745         .mdo_link        = cml_link,
746         .mdo_unlink      = cml_unlink,
747         .mdo_name_insert = cml_name_insert,
748         .mdo_rename      = cml_rename,
749         .mdo_rename_tgt  = cml_rename_tgt,
750         .mdo_create_data = cml_create_data
751 };
752
753 /* -------------------------------------------------------------------
754  * remote CMM object operations. cmr_...
755  */
756 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
757 {
758         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
759 }
760 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
761 {
762         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
763 }
764 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
765 {
766         return container_of0(co, struct cmr_object, cmm_obj);
767 }
768
769 /* get proper child device from MDCs */
770 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
771 {
772         struct lu_device *next = NULL;
773         struct mdc_device *mdc;
774
775         spin_lock(&d->cmm_tgt_guard);
776         list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
777                 if (mdc->mc_num == num) {
778                         next = mdc2lu_dev(mdc);
779                         break;
780                 }
781         }
782         spin_unlock(&d->cmm_tgt_guard);
783         return next;
784 }
785
786 /* lu_object operations */
787 static void cmr_object_free(const struct lu_env *env,
788                             struct lu_object *lo)
789 {
790         struct cmr_object *cro = lu2cmr_obj(lo);
791         lu_object_fini(lo);
792         OBD_FREE_PTR(cro);
793 }
794
795 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
796 {
797         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
798         struct lu_device  *c_dev;
799         struct lu_object  *c_obj;
800         int rc;
801
802         ENTRY;
803
804         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
805         if (c_dev == NULL) {
806                 rc = -ENOENT;
807         } else {
808                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
809                                                         lo->lo_header, c_dev);
810                 if (c_obj != NULL) {
811                         lu_object_add(lo, c_obj);
812                         rc = 0;
813                 } else {
814                         rc = -ENOMEM;
815                 }
816         }
817
818         RETURN(rc);
819 }
820
821 static int cmr_object_print(const struct lu_env *env, void *cookie,
822                             lu_printer_t p, const struct lu_object *lo)
823 {
824         return (*p)(env, cookie, LUSTRE_CMM_NAME"-remote@%p", lo);
825 }
826
827 static struct lu_object_operations cmr_obj_ops = {
828         .loo_object_init    = cmr_object_init,
829         .loo_object_free    = cmr_object_free,
830         .loo_object_print   = cmr_object_print
831 };
832
833 /* CMM remote md_object operations. All are invalid */
834 static int cmr_object_create(const struct lu_env *env,
835                              struct md_object *mo,
836                              const struct md_op_spec *spec,
837                              struct md_attr *ma)
838 {
839         return -EFAULT;
840 }
841
842 static int cmr_permission(const struct lu_env *env,
843                           struct md_object *p, struct md_object *c,
844                           struct md_attr *attr, int mask)
845 {
846         return -EREMOTE;
847 }
848
849 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
850                         struct md_attr *attr)
851 {
852         return -EREMOTE;
853 }
854
855 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
856                         const struct md_attr *attr)
857 {
858         return -EFAULT;
859 }
860
861 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
862                          struct lu_buf *buf, const char *name)
863 {
864         return -EFAULT;
865 }
866
867 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
868                         struct lu_buf *buf)
869 {
870         return -EFAULT;
871 }
872
873 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
874                           struct lu_buf *buf)
875 {
876         return -EFAULT;
877 }
878
879 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
880                          const struct lu_buf *buf, const char *name, int fl)
881 {
882         return -EFAULT;
883 }
884
885 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
886                          const char *name)
887 {
888         return -EFAULT;
889 }
890
891 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
892                        const struct md_attr *ma)
893 {
894         return -EFAULT;
895 }
896
897 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
898                        struct md_attr *ma)
899 {
900         return -EFAULT;
901 }
902
903 static int cmr_open(const struct lu_env *env, struct md_object *mo,
904                     int flags)
905 {
906         return -EREMOTE;
907 }
908
909 static int cmr_close(const struct lu_env *env, struct md_object *mo,
910                      struct md_attr *ma)
911 {
912         return -EFAULT;
913 }
914
915 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
916                         const struct lu_rdpg *rdpg)
917 {
918         return -EREMOTE;
919 }
920
921 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
922                         struct lustre_capa *capa, int renewal)
923 {
924         return -EFAULT;
925 }
926
927 static struct md_object_operations cmr_mo_ops = {
928         .moo_permission    = cmr_permission,
929         .moo_attr_get      = cmr_attr_get,
930         .moo_attr_set      = cmr_attr_set,
931         .moo_xattr_get     = cmr_xattr_get,
932         .moo_xattr_set     = cmr_xattr_set,
933         .moo_xattr_list    = cmr_xattr_list,
934         .moo_xattr_del     = cmr_xattr_del,
935         .moo_object_create = cmr_object_create,
936         .moo_ref_add       = cmr_ref_add,
937         .moo_ref_del       = cmr_ref_del,
938         .moo_open          = cmr_open,
939         .moo_close         = cmr_close,
940         .moo_readpage      = cmr_readpage,
941         .moo_readlink      = cmr_readlink,
942         .moo_capa_get      = cmr_capa_get
943 };
944
945 /* remote part of md_dir operations */
946 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
947                       const struct lu_name *lname, struct lu_fid *lf,
948                       struct md_op_spec *spec)
949 {
950         /*
951          * This can happens while rename() If new parent is remote dir, lookup
952          * will happen here.
953          */
954
955         return -EREMOTE;
956 }
957
958 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
959                                 struct md_object *mo, mdl_mode_t lm)
960 {
961         return MDL_MINMODE;
962 }
963
964 /*
965  * All methods below are cross-ref by nature. They consist of remote call and
966  * local operation. Due to future rollback functionality there are several
967  * limitations for such methods:
968  * 1) remote call should be done at first to do epoch negotiation between all
969  * MDS involved and to avoid the RPC inside transaction.
970  * 2) only one RPC can be sent - also due to epoch negotiation.
971  * For more details see rollback HLD/DLD.
972  */
973 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
974                       const struct lu_name *lchild_name, struct md_object *mo_c,
975                       struct md_op_spec *spec,
976                       struct md_attr *ma)
977 {
978         struct cmm_thread_info *cmi;
979         struct md_attr *tmp_ma;
980         int rc;
981         ENTRY;
982
983         /* Make sure that name isn't exist before doing remote call. */
984         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
985                         &cmm_env_info(env)->cmi_fid, NULL);
986         if (rc == 0)
987                 RETURN(-EEXIST);
988         else if (rc != -ENOENT)
989                 RETURN(rc);
990
991         /* check the SGID attr */
992         cmi = cmm_env_info(env);
993         LASSERT(cmi);
994         tmp_ma = &cmi->cmi_ma;
995         tmp_ma->ma_valid = 0;
996         tmp_ma->ma_need = MA_INODE;
997
998 #ifdef CONFIG_FS_POSIX_ACL
999         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1000                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1001                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1002                 tmp_ma->ma_need |= MA_ACL_DEF;
1003         }
1004 #endif
1005         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1006         if (rc)
1007                 RETURN(rc);
1008
1009         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1010                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1011                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1012                         ma->ma_attr.la_mode |= S_ISGID;
1013                         ma->ma_attr.la_valid |= LA_MODE;
1014                 }
1015         }
1016
1017 #ifdef CONFIG_FS_POSIX_ACL
1018         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1019                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1020                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1021                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1022                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1023         }
1024 #endif
1025
1026         /* Local permission check for name_insert before remote ops. */
1027         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1028                            (S_ISDIR(ma->ma_attr.la_mode) ?
1029                            MAY_LINK : MAY_CREATE));
1030         if (rc)
1031                 RETURN(rc);
1032
1033         /* Remote object creation and local name insert. */
1034         /*
1035          * XXX: @ma will be changed after mo_object_create, but we will use
1036          * it for mdo_name_insert later, so save it before mo_object_create.
1037          */
1038         *tmp_ma = *ma;
1039         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1040         if (rc == 0) {
1041                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1042                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1043                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1044                 if (unlikely(rc)) {
1045                         /* TODO: remove object mo_c on remote MDS */
1046                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1047                               " [name %s] [mo_c "DFID"] [err %d]\n",
1048                               PFID(lu_object_fid(&mo_p->mo_lu)),
1049                               lchild_name->ln_name,
1050                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1051                 }
1052         }
1053
1054         RETURN(rc);
1055 }
1056
1057 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1058                     struct md_object *mo_s, const struct lu_name *lname,
1059                     struct md_attr *ma)
1060 {
1061         int rc;
1062         ENTRY;
1063
1064         /* Make sure that name isn't exist before doing remote call. */
1065         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1066                         &cmm_env_info(env)->cmi_fid, NULL);
1067         if (rc == 0) {
1068                 rc = -EEXIST;
1069         } else if (rc == -ENOENT) {
1070                 /* Local permission check for name_insert before remote ops. */
1071                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1072                                    MAY_CREATE);
1073                 if (rc)
1074                         RETURN(rc);
1075
1076                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1077                 if (rc == 0) {
1078                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1079                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1080                                              lu_object_fid(&mo_s->mo_lu), ma);
1081                         if (unlikely(rc)) {
1082                                 /* TODO: ref_del from mo_s on remote MDS */
1083                                 CWARN("cmr_link failed, should revoke: "
1084                                       "[mo_p "DFID"] [mo_s "DFID"] "
1085                                       "[name %s] [err %d]\n",
1086                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1087                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1088                                       lname->ln_name, rc);
1089                         }
1090                 }
1091         }
1092         RETURN(rc);
1093 }
1094
1095 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1096                       struct md_object *mo_c, const struct lu_name *lname,
1097                       struct md_attr *ma)
1098 {
1099         struct cmm_thread_info *cmi;
1100         struct md_attr *tmp_ma;
1101         int rc;
1102         ENTRY;
1103
1104         /* Local permission check for name_remove before remote ops. */
1105         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1106                            MAY_UNLINK | MAY_VTX_PART);
1107         if (rc)
1108                 RETURN(rc);
1109
1110         /*
1111          * XXX: @ma will be changed after mo_ref_del, but we will use
1112          * it for mdo_name_remove later, so save it before mo_ref_del.
1113          */
1114         cmi = cmm_env_info(env);
1115         tmp_ma = &cmi->cmi_ma;
1116         *tmp_ma = *ma;
1117         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1118         if (rc == 0) {
1119                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1120                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1121                 if (unlikely(rc)) {
1122                         /* TODO: ref_add to mo_c on remote MDS */
1123                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1124                               " [mo_c "DFID"] [name %s] [err %d]\n",
1125                               PFID(lu_object_fid(&mo_p->mo_lu)),
1126                               PFID(lu_object_fid(&mo_c->mo_lu)),
1127                               lname->ln_name, rc);
1128                 }
1129         }
1130
1131         RETURN(rc);
1132 }
1133
1134 static inline void cmr_rename_warn(const char *fname,
1135                                   struct md_object *mo_po,
1136                                   struct md_object *mo_pn,
1137                                   const struct lu_fid *lf,
1138                                   const char *s_name,
1139                                   const char *t_name,
1140                                   int err)
1141 {
1142         CWARN("cmr_rename failed for %s, should revoke: "
1143               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1144               "[sname %s] [tname %s] [err %d]\n", fname,
1145               PFID(lu_object_fid(&mo_po->mo_lu)),
1146               PFID(lu_object_fid(&mo_pn->mo_lu)),
1147               PFID(lf), s_name, t_name, err);
1148 }
1149
1150 static int cmr_rename(const struct lu_env *env,
1151                       struct md_object *mo_po, struct md_object *mo_pn,
1152                       const struct lu_fid *lf, const struct lu_name *ls_name,
1153                       struct md_object *mo_t, const struct lu_name *lt_name,
1154                       struct md_attr *ma)
1155 {
1156         struct cmm_thread_info *cmi;
1157         struct md_attr *tmp_ma;
1158         int rc;
1159         ENTRY;
1160
1161         LASSERT(mo_t == NULL);
1162
1163         /* get real type of src */
1164         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1165         if (rc)
1166                 RETURN(rc);
1167
1168         /* Local permission check for name_remove before remote ops. */
1169         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1170                            MAY_UNLINK | MAY_VTX_FULL);
1171         if (rc)
1172                 RETURN(rc);
1173
1174         /*
1175          * XXX: @ma maybe changed after mdo_rename_tgt, but we will use it
1176          * for mdo_name_remove later, so save it before mdo_rename_tgt.
1177          */
1178         cmi = cmm_env_info(env);
1179         tmp_ma = &cmi->cmi_ma;
1180         *tmp_ma = *ma;
1181         /* the mo_pn is remote directory, so we cannot even know if there is
1182          * mo_t or not. Therefore mo_t is NULL here but remote server should do
1183          * lookup and process this further */
1184         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1185                             NULL/* mo_t */, lf, lt_name, ma);
1186         if (rc)
1187                 RETURN(rc);
1188
1189         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1190
1191         /* src object maybe on remote MDS, do remote ops first. */
1192         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1193         if (unlikely(rc)) {
1194                 /* TODO: revoke mdo_rename_tgt */
1195                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1196                                 ls_name->ln_name, lt_name->ln_name, rc);
1197                 RETURN(rc);
1198         }
1199
1200         /* only old name is removed localy */
1201         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1202         if (unlikely(rc))
1203                 /* TODO: revoke all cmr_rename */
1204                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1205                                 ls_name->ln_name, lt_name->ln_name, rc);
1206
1207         RETURN(rc);
1208 }
1209
1210 /* part of cross-ref rename(). Used to insert new name in new parent
1211  * and unlink target */
1212 static int cmr_rename_tgt(const struct lu_env *env,
1213                           struct md_object *mo_p, struct md_object *mo_t,
1214                           const struct lu_fid *lf, const struct lu_name *lname,
1215                           struct md_attr *ma)
1216 {
1217         struct cmm_thread_info *cmi;
1218         struct md_attr *tmp_ma;
1219         int rc;
1220         ENTRY;
1221
1222         /* target object is remote one */
1223         /* Local permission check for rename_tgt before remote ops. */
1224         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1225                            MAY_UNLINK | MAY_VTX_PART);
1226         if (rc)
1227                 RETURN(rc);
1228
1229         /*
1230          * XXX: @ma maybe changed after mo_ref_del, but we will use
1231          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1232          */
1233         cmi = cmm_env_info(env);
1234         tmp_ma = &cmi->cmi_ma;
1235         *tmp_ma = *ma;
1236         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1237         /* continue locally with name handling only */
1238         if (rc == 0) {
1239                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1240                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1241                                     NULL, lf, lname, tmp_ma);
1242                 if (unlikely(rc)) {
1243                         /* TODO: ref_add to mo_t on remote MDS */
1244                         CWARN("cmr_rename_tgt failed, should revoke: "
1245                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1246                               "[name %s] [err %d]\n",
1247                               PFID(lu_object_fid(&mo_p->mo_lu)),
1248                               PFID(lu_object_fid(&mo_t->mo_lu)),
1249                               PFID(lf),
1250                               lname->ln_name, rc);
1251                 }
1252         }
1253         RETURN(rc);
1254 }
1255
1256 static struct md_dir_operations cmr_dir_ops = {
1257         .mdo_is_subdir   = cmm_is_subdir,
1258         .mdo_lookup      = cmr_lookup,
1259         .mdo_lock_mode   = cmr_lock_mode,
1260         .mdo_create      = cmr_create,
1261         .mdo_link        = cmr_link,
1262         .mdo_unlink      = cmr_unlink,
1263         .mdo_rename      = cmr_rename,
1264         .mdo_rename_tgt  = cmr_rename_tgt,
1265 };