Whamcloud - gitweb
fb489ba0793ba5e117ffe790d0be3ebace99446b
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_object.c
5  *  Lustre Cluster Metadata Manager (cmm)
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Mike Pershin <tappro@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_fid.h>
36 #include "cmm_internal.h"
37 #include "mdc_internal.h"
38
39 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
40                    mdsno_t *mds, const struct lu_env *env)
41 {
42         int rc = 0;
43         ENTRY;
44
45         LASSERT(fid_is_sane(fid));
46
47         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
48         if (rc) {
49                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
50                        fid_seq(fid), rc);
51                 RETURN(rc);
52         }
53
54         if (*mds > cm->cmm_tgt_count) {
55                 CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
56                        *mds, cm->cmm_tgt_count);
57                 rc = -EINVAL;
58         } else {
59                 CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
60                        LPU64"\n", *mds, fid_seq(fid));
61         }
62
63         RETURN (rc);
64 }
65
66 static struct md_object_operations cml_mo_ops;
67 static struct md_dir_operations    cml_dir_ops;
68 static struct lu_object_operations cml_obj_ops;
69
70 static struct md_object_operations cmr_mo_ops;
71 static struct md_dir_operations    cmr_dir_ops;
72 static struct lu_object_operations cmr_obj_ops;
73
74 struct lu_object *cmm_object_alloc(const struct lu_env *env,
75                                    const struct lu_object_header *loh,
76                                    struct lu_device *ld)
77 {
78         const struct lu_fid *fid = &loh->loh_fid;
79         struct lu_object  *lo = NULL;
80         struct cmm_device *cd;
81         mdsno_t mds;
82         int rc = 0;
83
84         ENTRY;
85
86         cd = lu2cmm_dev(ld);
87         if (cd->cmm_flags & CMM_INITIALIZED) {
88                 /* get object location */
89                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
90                 if (rc)
91                         RETURN(NULL);
92         } else
93                 /*
94                  * Device is not yet initialized, cmm_object is being created
95                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
96                  * etc.). Such object *has* to be local.
97                  */
98                 mds = cd->cmm_local_num;
99
100         /* select the proper set of operations based on object location */
101         if (mds == cd->cmm_local_num) {
102                 struct cml_object *clo;
103
104                 OBD_ALLOC_PTR(clo);
105                 if (clo != NULL) {
106                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
107                         lu_object_init(lo, NULL, ld);
108                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
109                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
110                         lo->lo_ops = &cml_obj_ops;
111                 }
112         } else {
113                 struct cmr_object *cro;
114
115                 OBD_ALLOC_PTR(cro);
116                 if (cro != NULL) {
117                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
118                         lu_object_init(lo, NULL, ld);
119                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
120                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
121                         lo->lo_ops = &cmr_obj_ops;
122                         cro->cmo_num = mds;
123                 }
124         }
125         RETURN(lo);
126 }
127
128 /*
129  * CMM has two types of objects - local and remote. They have different set
130  * of operations so we are avoiding multiple checks in code.
131  */
132
133 /* get local child device */
134 static struct lu_device *cml_child_dev(struct cmm_device *d)
135 {
136         return &d->cmm_child->md_lu_dev;
137 }
138
139 /* lu_object operations */
140 static void cml_object_free(const struct lu_env *env,
141                             struct lu_object *lo)
142 {
143         struct cml_object *clo = lu2cml_obj(lo);
144         lu_object_fini(lo);
145         OBD_FREE_PTR(clo);
146 }
147
148 static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
149 {
150         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
151         struct lu_device  *c_dev;
152         struct lu_object  *c_obj;
153         int rc;
154
155         ENTRY;
156
157 #ifdef HAVE_SPLIT_SUPPORT
158         if (cd->cmm_tgt_count == 0)
159                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
160         else
161                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
162 #endif
163         c_dev = cml_child_dev(cd);
164         if (c_dev == NULL) {
165                 rc = -ENOENT;
166         } else {
167                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
168                                                         lo->lo_header, c_dev);
169                 if (c_obj != NULL) {
170                         lu_object_add(lo, c_obj);
171                         rc = 0;
172                 } else {
173                         rc = -ENOMEM;
174                 }
175         }
176
177         RETURN(rc);
178 }
179
180 static int cml_object_print(const struct lu_env *env, void *cookie,
181                             lu_printer_t p, const struct lu_object *lo)
182 {
183         return (*p)(env, cookie, LUSTRE_CMM_NAME"-local@%p", lo);
184 }
185
186 static struct lu_object_operations cml_obj_ops = {
187         .loo_object_init    = cml_object_init,
188         .loo_object_free    = cml_object_free,
189         .loo_object_print   = cml_object_print
190 };
191
192 /* CMM local md_object operations */
193 static int cml_object_create(const struct lu_env *env,
194                              struct md_object *mo,
195                              const struct md_op_spec *spec,
196                              struct md_attr *attr)
197 {
198         int rc;
199         ENTRY;
200         rc = mo_object_create(env, md_object_next(mo), spec, attr);
201         RETURN(rc);
202 }
203
204 static int cml_permission(const struct lu_env *env,
205                         struct md_object *mo, int mask)
206 {
207         int rc;
208         ENTRY;
209         rc = mo_permission(env, md_object_next(mo), mask);
210         RETURN(rc);
211 }
212
213 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
214                         struct md_attr *attr)
215 {
216         int rc;
217         ENTRY;
218         rc = mo_attr_get(env, md_object_next(mo), attr);
219         RETURN(rc);
220 }
221
222 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
223                         const struct md_attr *attr)
224 {
225         int rc;
226         ENTRY;
227         rc = mo_attr_set(env, md_object_next(mo), attr);
228         RETURN(rc);
229 }
230
231 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
232                          struct lu_buf *buf, const char *name)
233 {
234         int rc;
235         ENTRY;
236         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
237         RETURN(rc);
238 }
239
240 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
241                         struct lu_buf *buf)
242 {
243         int rc;
244         ENTRY;
245         rc = mo_readlink(env, md_object_next(mo), buf);
246         RETURN(rc);
247 }
248
249 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
250                           struct lu_buf *buf)
251 {
252         int rc;
253         ENTRY;
254         rc = mo_xattr_list(env, md_object_next(mo), buf);
255         RETURN(rc);
256 }
257
258 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
259                          const struct lu_buf *buf,
260                          const char *name, int fl)
261 {
262         int rc;
263         ENTRY;
264         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
265         RETURN(rc);
266 }
267
268 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
269                          const char *name)
270 {
271         int rc;
272         ENTRY;
273         rc = mo_xattr_del(env, md_object_next(mo), name);
274         RETURN(rc);
275 }
276
277 static int cml_ref_add(const struct lu_env *env, struct md_object *mo)
278 {
279         int rc;
280         ENTRY;
281         rc = mo_ref_add(env, md_object_next(mo));
282         RETURN(rc);
283 }
284
285 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
286                        struct md_attr *ma)
287 {
288         int rc;
289         ENTRY;
290         rc = mo_ref_del(env, md_object_next(mo), ma);
291         RETURN(rc);
292 }
293
294 static int cml_open(const struct lu_env *env, struct md_object *mo,
295                     int flags)
296 {
297         int rc;
298         ENTRY;
299         rc = mo_open(env, md_object_next(mo), flags);
300         RETURN(rc);
301 }
302
303 static int cml_close(const struct lu_env *env, struct md_object *mo,
304                      struct md_attr *ma)
305 {
306         int rc;
307         ENTRY;
308         rc = mo_close(env, md_object_next(mo), ma);
309         RETURN(rc);
310 }
311
312 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
313                         const struct lu_rdpg *rdpg)
314 {
315         int rc;
316         ENTRY;
317         rc = mo_readpage(env, md_object_next(mo), rdpg);
318         RETURN(rc);
319 }
320
321 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
322                         struct lustre_capa *capa, int renewal)
323 {
324         int rc;
325         ENTRY;
326         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
327         RETURN(rc);
328 }
329
330 static struct md_object_operations cml_mo_ops = {
331         .moo_permission    = cml_permission,
332         .moo_attr_get      = cml_attr_get,
333         .moo_attr_set      = cml_attr_set,
334         .moo_xattr_get     = cml_xattr_get,
335         .moo_xattr_list    = cml_xattr_list,
336         .moo_xattr_set     = cml_xattr_set,
337         .moo_xattr_del     = cml_xattr_del,
338         .moo_object_create = cml_object_create,
339         .moo_ref_add       = cml_ref_add,
340         .moo_ref_del       = cml_ref_del,
341         .moo_open          = cml_open,
342         .moo_close         = cml_close,
343         .moo_readpage      = cml_readpage,
344         .moo_readlink      = cml_readlink,
345         .moo_capa_get      = cml_capa_get
346 };
347
348 /* md_dir operations */
349 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
350                       const char *name, struct lu_fid *lf,
351                       struct md_op_spec *spec)
352 {
353         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo_p));
354         struct timeval start;
355         int rc;
356         ENTRY;
357
358         cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_LOOKUP);
359         
360 #ifdef HAVE_SPLIT_SUPPORT
361         if (spec != NULL && spec->sp_ck_split) {
362                 rc = cmm_split_check(env, mo_p, name);
363                 if (rc) {
364                         cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_LOOKUP);
365                         RETURN(rc);
366                 }
367         }
368 #endif
369         rc = mdo_lookup(env, md_object_next(mo_p), name, lf, spec);
370         cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_LOOKUP);
371         RETURN(rc);
372
373 }
374
375 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
376                                 struct md_object *mo, mdl_mode_t lm)
377 {
378         int rc = MDL_MINMODE;
379         ENTRY;
380         
381 #ifdef HAVE_SPLIT_SUPPORT
382         rc = cmm_split_access(env, mo, lm);
383 #endif
384         
385         RETURN(rc);
386 }
387
388 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
389                       const char *name, struct md_object *mo_c,
390                       struct md_op_spec *spec, struct md_attr *ma)
391 {
392         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo_p));
393         struct timeval start;
394         int rc;
395         ENTRY;
396
397         cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_CREATE);
398         
399 #ifdef HAVE_SPLIT_SUPPORT
400         /* Lock mode always should be sane. */
401         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
402
403         /*
404          * Sigh... This is long story. MDT may have race with detecting if split
405          * is possible in cmm. We know this race and let it live, because
406          * getting it rid (with some sem or spinlock) will also mean that
407          * PDIROPS for create will not work because we kill parallel work, what
408          * is really bad for performance and makes no sense having PDIROPS. So,
409          * we better allow the race to live, but split dir only if some of
410          * concurrent threads takes EX lock, not matter which one. So that, say,
411          * two concurrent threads may have different lock modes on directory (CW
412          * and EX) and not first one which comes here and see that split is
413          * possible should split the dir, but only that one which has EX
414          * lock. And we do not care that in this case, split may happen a bit
415          * later (when dir size will not be necessarily 64K, but may be a bit
416          * larger). So that, we allow concurrent creates and protect split by EX
417          * lock.
418          */
419         if (spec->sp_cr_mode == MDL_EX) {
420                 /* 
421                  * Try to split @mo_p. If split is ok, -ERESTART is returned and
422                  * current thread will not peoceed with create. Instead it sends
423                  * -ERESTART to client to let it know that correct MDT should be
424                  * choosen.
425                  */
426                 rc = cmm_split_dir(env, mo_p);
427                 if (rc)
428                         /* 
429                          * -ERESTART or some split error is returned, we can't
430                          * proceed with create.
431                          */
432                         GOTO(out, rc);
433         }
434
435         if (spec != NULL && spec->sp_ck_split) {
436                 /* 
437                  * Check for possible split directory and let caller know that
438                  * it should tell client that directory is split and operation
439                  * should repeat to correct MDT.
440                  */
441                 rc = cmm_split_check(env, mo_p, name);
442                 if (rc)
443                         GOTO(out, rc);
444         }
445 #endif
446
447         rc = mdo_create(env, md_object_next(mo_p), name, md_object_next(mo_c),
448                         spec, ma);
449
450         EXIT;
451 out:
452         cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_CREATE);
453         return rc;
454 }
455
456 static int cml_create_data(const struct lu_env *env, struct md_object *p,
457                            struct md_object *o,
458                            const struct md_op_spec *spec,
459                            struct md_attr *ma)
460 {
461         int rc;
462         ENTRY;
463         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
464                              spec, ma);
465         RETURN(rc);
466 }
467
468 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
469                     struct md_object *mo_s, const char *name,
470                     struct md_attr *ma)
471 {
472         int rc;
473         ENTRY;
474         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
475                       name, ma);
476         RETURN(rc);
477 }
478
479 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
480                       struct md_object *mo_c, const char *name,
481                       struct md_attr *ma)
482 {
483         int rc;
484         ENTRY;
485         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
486                         name, ma);
487         RETURN(rc);
488 }
489
490 /* rename is split to local/remote by location of new parent dir */
491 struct md_object *md_object_find(const struct lu_env *env,
492                                  struct md_device *md,
493                                  const struct lu_fid *f)
494 {
495         struct lu_object *o;
496         struct md_object *m;
497         ENTRY;
498
499         o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
500         if (IS_ERR(o))
501                 m = (struct md_object *)o;
502         else {
503                 o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
504                 m = o ? lu2md(o) : NULL;
505         }
506         RETURN(m);
507 }
508
509 static int __cmm_mode_get(const struct lu_env *env, struct md_device *md,
510                           const struct lu_fid *lf, struct md_attr *ma)
511 {
512         struct cmm_thread_info *cmi;
513         struct md_object *mo_s = md_object_find(env, md, lf);
514         struct md_attr *tmp_ma;
515         int rc;
516         ENTRY;
517
518         if (IS_ERR(mo_s))
519                 RETURN(PTR_ERR(mo_s));
520
521         cmi = cmm_env_info(env);
522         LASSERT(cmi);
523         tmp_ma = &cmi->cmi_ma;
524         tmp_ma->ma_need = MA_INODE;
525         tmp_ma->ma_valid = 0;
526         /* get type from src, can be remote req */
527         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
528         if (rc == 0) {
529                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
530                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
531                 ma->ma_attr.la_valid |= LA_MODE | LA_FLAGS;
532         }
533         lu_object_put(env, &mo_s->mo_lu);
534         return rc;
535 }
536
537 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
538                       struct md_object *mo_pn, const struct lu_fid *lf,
539                       const char *s_name, struct md_object *mo_t,
540                       const char *t_name, struct md_attr *ma)
541 {
542         int rc;
543         ENTRY;
544
545         rc = __cmm_mode_get(env, md_obj2dev(mo_po), lf, ma);
546         if (rc != 0)
547                 RETURN(rc);
548
549         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
550                 /* mo_t is remote object and there is RPC to unlink it */
551                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
552                 if (rc)
553                         RETURN(rc);
554                 mo_t = NULL;
555         }
556
557         /* local rename, mo_t can be NULL */
558         rc = mdo_rename(env, md_object_next(mo_po),
559                         md_object_next(mo_pn), lf, s_name,
560                         md_object_next(mo_t), t_name, ma);
561         RETURN(rc);
562 }
563
564 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
565                           struct md_object *mo_t, const struct lu_fid *lf,
566                           const char *name, struct md_attr *ma)
567 {
568         int rc;
569         ENTRY;
570
571         rc = mdo_rename_tgt(env, md_object_next(mo_p),
572                             md_object_next(mo_t), lf, name, ma);
573         RETURN(rc);
574 }
575 /* used only in case of rename_tgt() when target is not exist */
576 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
577                            const char *name, const struct lu_fid *lf, int isdir)
578 {
579         int rc;
580         ENTRY;
581
582         rc = mdo_name_insert(env, md_object_next(p), name, lf, isdir);
583
584         RETURN(rc);
585 }
586
587 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
588                          const struct lu_fid *fid, struct lu_fid *sfid)
589 {
590         struct cmm_thread_info *cmi;
591         int rc;
592         ENTRY;
593
594         cmi = cmm_env_info(env);
595         rc = __cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma);
596         if (rc)
597                 RETURN(rc);
598
599         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
600                 RETURN(0);
601
602         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
603         RETURN(rc);
604 }
605
606 static struct md_dir_operations cml_dir_ops = {
607         .mdo_is_subdir   = cmm_is_subdir,
608         .mdo_lookup      = cml_lookup,
609         .mdo_lock_mode   = cml_lock_mode,
610         .mdo_create      = cml_create,
611         .mdo_link        = cml_link,
612         .mdo_unlink      = cml_unlink,
613         .mdo_name_insert = cml_name_insert,
614         .mdo_rename      = cml_rename,
615         .mdo_rename_tgt  = cml_rename_tgt,
616         .mdo_create_data = cml_create_data
617 };
618
619 /* -------------------------------------------------------------------
620  * remote CMM object operations. cmr_...
621  */
622 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
623 {
624         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
625 }
626 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
627 {
628         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
629 }
630 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
631 {
632         return container_of0(co, struct cmr_object, cmm_obj);
633 }
634
635 /* get proper child device from MDCs */
636 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
637 {
638         struct lu_device *next = NULL;
639         struct mdc_device *mdc;
640
641         spin_lock(&d->cmm_tgt_guard);
642         list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
643                 if (mdc->mc_num == num) {
644                         next = mdc2lu_dev(mdc);
645                         break;
646                 }
647         }
648         spin_unlock(&d->cmm_tgt_guard);
649         return next;
650 }
651
652 /* lu_object operations */
653 static void cmr_object_free(const struct lu_env *env,
654                             struct lu_object *lo)
655 {
656         struct cmr_object *cro = lu2cmr_obj(lo);
657         lu_object_fini(lo);
658         OBD_FREE_PTR(cro);
659 }
660
661 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
662 {
663         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
664         struct lu_device  *c_dev;
665         struct lu_object  *c_obj;
666         int rc;
667
668         ENTRY;
669
670         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
671         if (c_dev == NULL) {
672                 rc = -ENOENT;
673         } else {
674                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
675                                                         lo->lo_header, c_dev);
676                 if (c_obj != NULL) {
677                         lu_object_add(lo, c_obj);
678                         rc = 0;
679                 } else {
680                         rc = -ENOMEM;
681                 }
682         }
683
684         RETURN(rc);
685 }
686
687 static int cmr_object_print(const struct lu_env *env, void *cookie,
688                             lu_printer_t p, const struct lu_object *lo)
689 {
690         return (*p)(env, cookie, LUSTRE_CMM_NAME"-remote@%p", lo);
691 }
692
693 static struct lu_object_operations cmr_obj_ops = {
694         .loo_object_init    = cmr_object_init,
695         .loo_object_free    = cmr_object_free,
696         .loo_object_print   = cmr_object_print
697 };
698
699 /* CMM remote md_object operations. All are invalid */
700 static int cmr_object_create(const struct lu_env *env,
701                              struct md_object *mo,
702                              const struct md_op_spec *spec,
703                              struct md_attr *ma)
704 {
705         return -EFAULT;
706 }
707
708 static int cmr_permission(const struct lu_env *env, struct md_object *mo,
709                           int mask)
710 {
711         return -EREMOTE;
712 }
713
714 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
715                         struct md_attr *attr)
716 {
717         return -EREMOTE;
718 }
719
720 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
721                         const struct md_attr *attr)
722 {
723         return -EFAULT;
724 }
725
726 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
727                          struct lu_buf *buf, const char *name)
728 {
729         return -EFAULT;
730 }
731
732 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
733                         struct lu_buf *buf)
734 {
735         return -EFAULT;
736 }
737
738 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
739                           struct lu_buf *buf)
740 {
741         return -EFAULT;
742 }
743
744 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
745                          const struct lu_buf *buf, const char *name, int fl)
746 {
747         return -EFAULT;
748 }
749
750 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
751                          const char *name)
752 {
753         return -EFAULT;
754 }
755
756 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo)
757 {
758         return -EFAULT;
759 }
760
761 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
762                        struct md_attr *ma)
763 {
764         return -EFAULT;
765 }
766
767 static int cmr_open(const struct lu_env *env, struct md_object *mo,
768                     int flags)
769 {
770         return -EREMOTE;
771 }
772
773 static int cmr_close(const struct lu_env *env, struct md_object *mo,
774                      struct md_attr *ma)
775 {
776         return -EFAULT;
777 }
778
779 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
780                         const struct lu_rdpg *rdpg)
781 {
782         return -EREMOTE;
783 }
784
785 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
786                         struct lustre_capa *capa, int renewal)
787 {
788         return -EFAULT;
789 }
790
791 static struct md_object_operations cmr_mo_ops = {
792         .moo_permission    = cmr_permission,
793         .moo_attr_get      = cmr_attr_get,
794         .moo_attr_set      = cmr_attr_set,
795         .moo_xattr_get     = cmr_xattr_get,
796         .moo_xattr_set     = cmr_xattr_set,
797         .moo_xattr_list    = cmr_xattr_list,
798         .moo_xattr_del     = cmr_xattr_del,
799         .moo_object_create = cmr_object_create,
800         .moo_ref_add       = cmr_ref_add,
801         .moo_ref_del       = cmr_ref_del,
802         .moo_open          = cmr_open,
803         .moo_close         = cmr_close,
804         .moo_readpage      = cmr_readpage,
805         .moo_readlink      = cmr_readlink,
806         .moo_capa_get      = cmr_capa_get
807 };
808
809 /* remote part of md_dir operations */
810 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
811                       const char *name, struct lu_fid *lf,
812                       struct md_op_spec *spec)
813 {
814         /*
815          * This can happens while rename() If new parent is remote dir, lookup
816          * will happen here.
817          */
818
819         return -EREMOTE;
820 }
821
822 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
823                                 struct md_object *mo, mdl_mode_t lm)
824 {
825         return MDL_MINMODE;
826 }
827
828 /*
829  * All methods below are cross-ref by nature. They consist of remote call and
830  * local operation. Due to future rollback functionality there are several
831  * limitations for such methods:
832  * 1) remote call should be done at first to do epoch negotiation between all
833  * MDS involved and to avoid the RPC inside transaction.
834  * 2) only one RPC can be sent - also due to epoch negotiation.
835  * For more details see rollback HLD/DLD.
836  */
837 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
838                       const char *child_name, struct md_object *mo_c,
839                       struct md_op_spec *spec,
840                       struct md_attr *ma)
841 {
842         struct cmm_thread_info *cmi;
843         struct md_attr *tmp_ma;
844         int rc;
845         ENTRY;
846
847         /* Make sure that name isn't exist before doing remote call. */
848         rc = mdo_lookup(env, md_object_next(mo_p), child_name,
849                         &cmm_env_info(env)->cmi_fid, NULL);
850         if (rc == 0)
851                 RETURN(-EEXIST);
852
853         /* check the SGID attr */
854         cmi = cmm_env_info(env);
855         LASSERT(cmi);
856         tmp_ma = &cmi->cmi_ma;
857         tmp_ma->ma_valid = 0;
858         tmp_ma->ma_need = MA_INODE;
859
860 #ifdef CONFIG_FS_POSIX_ACL
861         if (!S_ISLNK(ma->ma_attr.la_mode)) {
862                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
863                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
864                 tmp_ma->ma_need |= MA_ACL_DEF;
865         }
866 #endif
867         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
868         if (rc)
869                 RETURN(rc);
870
871         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
872                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
873                 if (S_ISDIR(ma->ma_attr.la_mode)) {
874                         ma->ma_attr.la_mode |= S_ISGID;
875                         ma->ma_attr.la_valid |= LA_MODE;
876                 }
877         }
878
879 #ifdef CONFIG_FS_POSIX_ACL
880         if (tmp_ma->ma_valid & MA_ACL_DEF) {
881                 spec->u.sp_ea.fid = spec->u.sp_pfid;
882                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
883                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
884                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
885         }
886 #endif
887
888         /* Remote object creation and local name insert. */
889         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
890         if (rc == 0) {
891                 rc = mdo_name_insert(env, md_object_next(mo_p),
892                                      child_name, lu_object_fid(&mo_c->mo_lu),
893                                      S_ISDIR(ma->ma_attr.la_mode));
894         }
895
896         RETURN(rc);
897 }
898
899 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
900                     struct md_object *mo_s, const char *name,
901                     struct md_attr *ma)
902 {
903         int rc;
904         ENTRY;
905         
906         /* Make sure that name isn't exist before doing remote call. */
907         rc = mdo_lookup(env, md_object_next(mo_p), name,
908                         &cmm_env_info(env)->cmi_fid, NULL);
909         if (rc == 0) {
910                 rc = -EEXIST;
911         } else if (rc == -ENOENT) {
912                 rc = mo_ref_add(env, md_object_next(mo_s));
913                 if (rc == 0) {
914                         rc = mdo_name_insert(env, md_object_next(mo_p), name,
915                                              lu_object_fid(&mo_s->mo_lu), 0);
916                 }
917         }
918         RETURN(rc);
919 }
920
921 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
922                       struct md_object *mo_c, const char *name,
923                       struct md_attr *ma)
924 {
925         int rc;
926         ENTRY;
927
928         rc = mo_ref_del(env, md_object_next(mo_c), ma);
929         if (rc == 0) {
930                 rc = mdo_name_remove(env, md_object_next(mo_p), name,
931                                      S_ISDIR(ma->ma_attr.la_mode));
932         }
933
934         RETURN(rc);
935 }
936
937 static int cmr_rename(const struct lu_env *env,
938                       struct md_object *mo_po, struct md_object *mo_pn,
939                       const struct lu_fid *lf, const char *s_name,
940                       struct md_object *mo_t, const char *t_name,
941                       struct md_attr *ma)
942 {
943         int rc;
944         ENTRY;
945
946         /* get real type of src */
947         rc = __cmm_mode_get(env, md_obj2dev(mo_po), lf, ma);
948         if (rc != 0)
949                 RETURN(rc);
950
951         LASSERT(mo_t == NULL);
952         /* the mo_pn is remote directory, so we cannot even know if there is
953          * mo_t or not. Therefore mo_t is NULL here but remote server should do
954          * lookup and process this further */
955         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
956                             NULL/* mo_t */, lf, t_name, ma);
957         /* only old name is removed localy */
958         if (rc == 0)
959                 rc = mdo_name_remove(env, md_object_next(mo_po),
960                                      s_name, S_ISDIR(ma->ma_attr.la_mode));
961
962         RETURN(rc);
963 }
964
965 /* part of cross-ref rename(). Used to insert new name in new parent
966  * and unlink target */
967 static int cmr_rename_tgt(const struct lu_env *env,
968                           struct md_object *mo_p, struct md_object *mo_t,
969                           const struct lu_fid *lf, const char *name,
970                           struct md_attr *ma)
971 {
972         int rc;
973         ENTRY;
974         /* target object is remote one */
975         rc = mo_ref_del(env, md_object_next(mo_t), ma);
976         /* continue locally with name handling only */
977         if (rc == 0)
978                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
979                                     NULL, lf, name, ma);
980         RETURN(rc);
981 }
982
983 static struct md_dir_operations cmr_dir_ops = {
984         .mdo_is_subdir   = cmm_is_subdir,
985         .mdo_lookup      = cmr_lookup,
986         .mdo_lock_mode   = cmr_lock_mode,
987         .mdo_create      = cmr_create,
988         .mdo_link        = cmr_link,
989         .mdo_unlink      = cmr_unlink,
990         .mdo_rename      = cmr_rename,
991         .mdo_rename_tgt  = cmr_rename_tgt,
992 };