Whamcloud - gitweb
cleanup of md_ucred handling.
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_object.c
5  *  Lustre Cluster Metadata Manager (cmm)
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Mike Pershin <tappro@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_fid.h>
36 #include "cmm_internal.h"
37 #include "mdc_internal.h"
38
39 extern struct lu_context_key cmm_thread_key;
40
41 static int cmm_fld_lookup(struct cmm_device *cm,
42                           const struct lu_fid *fid, mdsno_t *mds,
43                           const struct lu_env *env)
44 {
45         struct lu_site *ls;
46         int rc = 0;
47         ENTRY;
48
49         LASSERT(fid_is_sane(fid));
50
51         ls = cm->cmm_md_dev.md_lu_dev.ld_site;
52
53         rc = fld_client_lookup(ls->ls_client_fld,
54                                fid_seq(fid), mds, env);
55         if (rc) {
56                 CERROR("can't find mds by seq "LPX64", rc %d\n",
57                        fid_seq(fid), rc);
58                 RETURN(rc);
59         }
60
61         if (*mds > cm->cmm_tgt_count) {
62                 CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
63                        *mds, cm->cmm_tgt_count);
64                 rc = -EINVAL;
65         } else {
66                 CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "LPU64"\n",
67                        *mds, fid_seq(fid));
68         }
69
70         RETURN (rc);
71 }
72
73 static struct md_object_operations cml_mo_ops;
74 static struct md_dir_operations    cml_dir_ops;
75 static struct lu_object_operations cml_obj_ops;
76
77 static struct md_object_operations cmr_mo_ops;
78 static struct md_dir_operations    cmr_dir_ops;
79 static struct lu_object_operations cmr_obj_ops;
80
81 struct lu_object *cmm_object_alloc(const struct lu_env *env,
82                                    const struct lu_object_header *loh,
83                                    struct lu_device *ld)
84 {
85         struct lu_object  *lo = NULL;
86         const struct lu_fid *fid = &loh->loh_fid;
87         struct cmm_device *cd;
88         mdsno_t mdsnum;
89         int rc = 0;
90
91         ENTRY;
92
93         cd = lu2cmm_dev(ld);
94         if (cd->cmm_flags & CMM_INITIALIZED) {
95                 /* get object location */
96                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mdsnum, env);
97                 if (rc)
98                         RETURN(NULL);
99         } else
100                 /*
101                  * Device is not yet initialized, cmm_object is being created
102                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
103                  * etc.). Such object *has* to be local.
104                  */
105                 mdsnum = cd->cmm_local_num;
106
107         /* select the proper set of operations based on object location */
108         if (mdsnum == cd->cmm_local_num) {
109                 struct cml_object *clo;
110
111                 OBD_ALLOC_PTR(clo);
112                 if (clo != NULL) {
113                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
114                         lu_object_init(lo, NULL, ld);
115                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
116                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
117                         lo->lo_ops = &cml_obj_ops;
118                 }
119         } else {
120                 struct cmr_object *cro;
121
122                 OBD_ALLOC_PTR(cro);
123                 if (cro != NULL) {
124                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
125                         lu_object_init(lo, NULL, ld);
126                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
127                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
128                         lo->lo_ops = &cmr_obj_ops;
129                         cro->cmo_num = mdsnum;
130                 }
131         }
132         RETURN(lo);
133 }
134
135 /*
136  * CMM has two types of objects - local and remote. They have different set
137  * of operations so we are avoiding multiple checks in code.
138  */
139
140 /*
141  * local CMM object operations. cml_...
142  */
143 static inline struct cml_object *lu2cml_obj(struct lu_object *o)
144 {
145         return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
146 }
147 static inline struct cml_object *md2cml_obj(struct md_object *mo)
148 {
149         return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
150 }
151 static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
152 {
153         return container_of0(co, struct cml_object, cmm_obj);
154 }
155 /* get local child device */
156 static struct lu_device *cml_child_dev(struct cmm_device *d)
157 {
158         return &d->cmm_child->md_lu_dev;
159 }
160
161 /* lu_object operations */
162 static void cml_object_free(const struct lu_env *env,
163                             struct lu_object *lo)
164 {
165         struct cml_object *clo = lu2cml_obj(lo);
166         lu_object_fini(lo);
167         OBD_FREE_PTR(clo);
168 }
169
170 static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
171 {
172         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
173         struct lu_device  *c_dev;
174         struct lu_object  *c_obj;
175         int rc;
176
177         ENTRY;
178
179         c_dev = cml_child_dev(cd);
180         if (c_dev == NULL) {
181                 rc = -ENOENT;
182         } else {
183                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
184                                                         lo->lo_header, c_dev);
185                 if (c_obj != NULL) {
186                         lu_object_add(lo, c_obj);
187                         rc = 0;
188                 } else {
189                         rc = -ENOMEM;
190                 }
191         }
192
193         RETURN(rc);
194 }
195
196 static int cml_object_print(const struct lu_env *env, void *cookie,
197                             lu_printer_t p, const struct lu_object *lo)
198 {
199         return (*p)(env, cookie, LUSTRE_CMM_NAME"-local@%p", lo);
200 }
201
202 static struct lu_object_operations cml_obj_ops = {
203         .loo_object_init    = cml_object_init,
204         .loo_object_free    = cml_object_free,
205         .loo_object_print   = cml_object_print
206 };
207
208 /* CMM local md_object operations */
209 static int cml_object_create(const struct lu_env *env,
210                              struct md_object *mo,
211                              const struct md_create_spec *spec,
212                              struct md_attr *attr)
213 {
214         int rc;
215         ENTRY;
216         rc = mo_object_create(env, md_object_next(mo), spec, attr);
217         RETURN(rc);
218 }
219
220 static int cml_permission(const struct lu_env *env,
221                         struct md_object *mo, int mask)
222 {
223         int rc;
224         ENTRY;
225         rc = mo_permission(env, md_object_next(mo), mask);
226         RETURN(rc);
227 }
228
229 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
230                         struct md_attr *attr)
231 {
232         int rc;
233         ENTRY;
234         rc = mo_attr_get(env, md_object_next(mo), attr);
235         RETURN(rc);
236 }
237
238 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
239                         const struct md_attr *attr)
240 {
241         int rc;
242         ENTRY;
243         rc = mo_attr_set(env, md_object_next(mo), attr);
244         RETURN(rc);
245 }
246
247 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
248                          struct lu_buf *buf, const char *name)
249 {
250         int rc;
251         ENTRY;
252         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
253         RETURN(rc);
254 }
255
256 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
257                         struct lu_buf *buf)
258 {
259         int rc;
260         ENTRY;
261         rc = mo_readlink(env, md_object_next(mo), buf);
262         RETURN(rc);
263 }
264
265 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
266                           struct lu_buf *buf)
267 {
268         int rc;
269         ENTRY;
270         rc = mo_xattr_list(env, md_object_next(mo), buf);
271         RETURN(rc);
272 }
273
274 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
275                          const struct lu_buf *buf,
276                          const char *name, int fl)
277 {
278         int rc;
279         ENTRY;
280         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
281         RETURN(rc);
282 }
283
284 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
285                          const char *name)
286 {
287         int rc;
288         ENTRY;
289         rc = mo_xattr_del(env, md_object_next(mo), name);
290         RETURN(rc);
291 }
292
293 static int cml_ref_add(const struct lu_env *env, struct md_object *mo)
294 {
295         int rc;
296         ENTRY;
297         rc = mo_ref_add(env, md_object_next(mo));
298         RETURN(rc);
299 }
300
301 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
302                        struct md_attr *ma)
303 {
304         int rc;
305         ENTRY;
306         rc = mo_ref_del(env, md_object_next(mo), ma);
307         RETURN(rc);
308 }
309
310 static int cml_open(const struct lu_env *env, struct md_object *mo,
311                     int flags)
312 {
313         int rc;
314         ENTRY;
315         rc = mo_open(env, md_object_next(mo), flags);
316         RETURN(rc);
317 }
318
319 static int cml_close(const struct lu_env *env, struct md_object *mo,
320                      struct md_attr *ma)
321 {
322         int rc;
323         ENTRY;
324         rc = mo_close(env, md_object_next(mo), ma);
325         RETURN(rc);
326 }
327
328 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
329                         const struct lu_rdpg *rdpg)
330 {
331         int rc;
332         ENTRY;
333         rc = mo_readpage(env, md_object_next(mo), rdpg);
334         RETURN(rc);
335 }
336
337 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
338                         struct lustre_capa *capa)
339 {
340         int rc;
341         ENTRY;
342         rc = mo_capa_get(env, md_object_next(mo), capa);
343         RETURN(rc);
344 }
345
346 static struct md_object_operations cml_mo_ops = {
347         .moo_permission    = cml_permission,
348         .moo_attr_get      = cml_attr_get,
349         .moo_attr_set      = cml_attr_set,
350         .moo_xattr_get     = cml_xattr_get,
351         .moo_xattr_list    = cml_xattr_list,
352         .moo_xattr_set     = cml_xattr_set,
353         .moo_xattr_del     = cml_xattr_del,
354         .moo_object_create = cml_object_create,
355         .moo_ref_add       = cml_ref_add,
356         .moo_ref_del       = cml_ref_del,
357         .moo_open          = cml_open,
358         .moo_close         = cml_close,
359         .moo_readpage      = cml_readpage,
360         .moo_readlink      = cml_readlink,
361         .moo_capa_get      = cml_capa_get
362 };
363
364 /* md_dir operations */
365 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
366                       const char *name, struct lu_fid *lf)
367 {
368         int rc;
369         ENTRY;
370         rc = mdo_lookup(env, md_object_next(mo_p), name, lf);
371         RETURN(rc);
372
373 }
374
375 static int cml_create(const struct lu_env *env,
376                       struct md_object *mo_p, const char *child_name,
377                       struct md_object *mo_c, const struct md_create_spec *spec,
378                       struct md_attr *ma)
379 {
380         int rc;
381         ENTRY;
382
383 #ifdef HAVE_SPLIT_SUPPORT
384         rc = cml_try_to_split(env, mo_p);
385         if (rc)
386                 RETURN(rc);
387 #endif
388
389         rc = mdo_create(env, md_object_next(mo_p), child_name,
390                         md_object_next(mo_c), spec, ma);
391
392
393         RETURN(rc);
394 }
395
396 static int cml_create_data(const struct lu_env *env, struct md_object *p,
397                            struct md_object *o,
398                            const struct md_create_spec *spec,
399                            struct md_attr *ma)
400 {
401         int rc;
402         ENTRY;
403         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
404                              spec, ma);
405         RETURN(rc);
406 }
407
408 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
409                     struct md_object *mo_s, const char *name,
410                     struct md_attr *ma)
411 {
412         int rc;
413         ENTRY;
414         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
415                       name, ma);
416         RETURN(rc);
417 }
418
419 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
420                       struct md_object *mo_c, const char *name,
421                       struct md_attr *ma)
422 {
423         int rc;
424         ENTRY;
425         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
426                         name, ma);
427         RETURN(rc);
428 }
429
430 /* rename is split to local/remote by location of new parent dir */
431 struct md_object *md_object_find(const struct lu_env *env,
432                                  struct md_device *md,
433                                  const struct lu_fid *f)
434 {
435         struct lu_object *o;
436         struct md_object *m;
437         ENTRY;
438
439         o = lu_object_find(env, md2lu_dev(md)->ld_site, f, BYPASS_CAPA);
440         if (IS_ERR(o))
441                 m = (struct md_object *)o;
442         else {
443                 o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
444                 m = o ? lu2md(o) : NULL;
445         }
446         RETURN(m);
447 }
448
449 static int __cmm_mode_get(const struct lu_env *env, struct md_device *md,
450                           const struct lu_fid *lf, struct md_attr *ma)
451 {
452         struct cmm_thread_info *cmi;
453         struct md_object *mo_s = md_object_find(env, md, lf);
454         struct md_attr *tmp_ma;
455         int rc;
456         ENTRY;
457
458         if (IS_ERR(mo_s))
459                 RETURN(PTR_ERR(mo_s));
460
461         cmi = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
462         LASSERT(cmi);
463         tmp_ma = &cmi->cmi_ma;
464         tmp_ma->ma_need = MA_INODE;
465
466         /* get type from src, can be remote req */
467         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
468         if (rc == 0) {
469                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
470                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
471                 ma->ma_attr.la_valid |= LA_MODE | LA_FLAGS;
472         }
473         lu_object_put(env, &mo_s->mo_lu);
474         return rc;
475 }
476
477 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
478                       struct md_object *mo_pn, const struct lu_fid *lf,
479                       const char *s_name, struct md_object *mo_t,
480                       const char *t_name, struct md_attr *ma)
481 {
482         int rc;
483         ENTRY;
484
485         rc = __cmm_mode_get(env, md_obj2dev(mo_po), lf, ma);
486         if (rc != 0)
487                 RETURN(rc);
488
489         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
490                 /* mo_t is remote object and there is RPC to unlink it */
491                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
492                 if (rc)
493                         RETURN(rc);
494                 mo_t = NULL;
495         }
496
497         /* local rename, mo_t can be NULL */
498         rc = mdo_rename(env, md_object_next(mo_po),
499                         md_object_next(mo_pn), lf, s_name,
500                         md_object_next(mo_t), t_name, ma);
501         RETURN(rc);
502 }
503
504 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
505                           struct md_object *mo_t, const struct lu_fid *lf,
506                           const char *name, struct md_attr *ma)
507 {
508         int rc;
509         ENTRY;
510
511         rc = mdo_rename_tgt(env, md_object_next(mo_p),
512                             md_object_next(mo_t), lf, name, ma);
513         RETURN(rc);
514 }
515 /* used only in case of rename_tgt() when target is not exist */
516 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
517                            const char *name, const struct lu_fid *lf, int isdir)
518 {
519         int rc;
520         ENTRY;
521
522         rc = mdo_name_insert(env, md_object_next(p), name, lf, isdir);
523
524         RETURN(rc);
525 }
526
527 /* Common method for remote and local use. */
528 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
529                          const struct lu_fid *fid, struct lu_fid *sfid)
530 {
531         struct cmm_thread_info *cmi;
532         int rc;
533         ENTRY;
534
535         cmi = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
536         rc = __cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma);
537         if (rc)
538                 RETURN(rc);
539
540         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
541                 RETURN(0);
542
543         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
544         RETURN(rc);
545 }
546
547 static struct md_dir_operations cml_dir_ops = {
548         .mdo_is_subdir   = cmm_is_subdir,
549         .mdo_lookup      = cml_lookup,
550         .mdo_create      = cml_create,
551         .mdo_link        = cml_link,
552         .mdo_unlink      = cml_unlink,
553         .mdo_name_insert = cml_name_insert,
554         .mdo_rename      = cml_rename,
555         .mdo_rename_tgt  = cml_rename_tgt,
556         .mdo_create_data = cml_create_data
557 };
558
559 /* -------------------------------------------------------------------
560  * remote CMM object operations. cmr_...
561  */
562 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
563 {
564         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
565 }
566 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
567 {
568         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
569 }
570 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
571 {
572         return container_of0(co, struct cmr_object, cmm_obj);
573 }
574
575 /* get proper child device from MDCs */
576 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
577 {
578         struct lu_device *next = NULL;
579         struct mdc_device *mdc;
580
581         spin_lock(&d->cmm_tgt_guard);
582         list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
583                 if (mdc->mc_num == num) {
584                         next = mdc2lu_dev(mdc);
585                         break;
586                 }
587         }
588         spin_unlock(&d->cmm_tgt_guard);
589         return next;
590 }
591
592 /* lu_object operations */
593 static void cmr_object_free(const struct lu_env *env,
594                             struct lu_object *lo)
595 {
596         struct cmr_object *cro = lu2cmr_obj(lo);
597         lu_object_fini(lo);
598         OBD_FREE_PTR(cro);
599 }
600
601 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
602 {
603         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
604         struct lu_device  *c_dev;
605         struct lu_object  *c_obj;
606         int rc;
607
608         ENTRY;
609
610         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
611         if (c_dev == NULL) {
612                 rc = -ENOENT;
613         } else {
614                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
615                                                         lo->lo_header, c_dev);
616                 if (c_obj != NULL) {
617                         lu_object_add(lo, c_obj);
618                         rc = 0;
619                 } else {
620                         rc = -ENOMEM;
621                 }
622         }
623
624         RETURN(rc);
625 }
626
627 static int cmr_object_print(const struct lu_env *env, void *cookie,
628                             lu_printer_t p, const struct lu_object *lo)
629 {
630         return (*p)(env, cookie, LUSTRE_CMM_NAME"-remote@%p", lo);
631 }
632
633 static struct lu_object_operations cmr_obj_ops = {
634         .loo_object_init    = cmr_object_init,
635         .loo_object_free    = cmr_object_free,
636         .loo_object_print   = cmr_object_print
637 };
638
639 /* CMM remote md_object operations. All are invalid */
640 static int cmr_object_create(const struct lu_env *env,
641                              struct md_object *mo,
642                              const struct md_create_spec *spec,
643                              struct md_attr *ma)
644 {
645         RETURN(-EFAULT);
646 }
647
648 static int cmr_permission(const struct lu_env *env, struct md_object *mo,
649                           int mask)
650 {
651         RETURN(-EREMOTE);
652 }
653
654 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
655                         struct md_attr *attr)
656 {
657         RETURN(-EREMOTE);
658 }
659
660 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
661                         const struct md_attr *attr)
662 {
663         RETURN(-EFAULT);
664 }
665
666 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
667                          struct lu_buf *buf, const char *name)
668 {
669         RETURN(-EFAULT);
670 }
671
672 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
673                         struct lu_buf *buf)
674 {
675         RETURN(-EFAULT);
676 }
677
678 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
679                           struct lu_buf *buf)
680 {
681         RETURN(-EFAULT);
682 }
683
684 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
685                          const struct lu_buf *buf, const char *name, int fl)
686 {
687         RETURN(-EFAULT);
688 }
689
690 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
691                          const char *name)
692 {
693         RETURN(-EFAULT);
694 }
695
696 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo)
697 {
698         RETURN(-EFAULT);
699 }
700
701 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
702                        struct md_attr *ma)
703 {
704         RETURN(-EFAULT);
705 }
706
707 static int cmr_open(const struct lu_env *env, struct md_object *mo,
708                     int flags)
709 {
710         RETURN(-EREMOTE);
711 }
712
713 static int cmr_close(const struct lu_env *env, struct md_object *mo,
714                      struct md_attr *ma)
715 {
716         RETURN(-EFAULT);
717 }
718
719 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
720                         const struct lu_rdpg *rdpg)
721 {
722         RETURN(-EREMOTE);
723 }
724
725 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
726                         struct lustre_capa *capa)
727 {
728         RETURN(-EFAULT);
729 }
730
731 static struct md_object_operations cmr_mo_ops = {
732         .moo_permission    = cmr_permission,
733         .moo_attr_get      = cmr_attr_get,
734         .moo_attr_set      = cmr_attr_set,
735         .moo_xattr_get     = cmr_xattr_get,
736         .moo_xattr_set     = cmr_xattr_set,
737         .moo_xattr_list    = cmr_xattr_list,
738         .moo_xattr_del     = cmr_xattr_del,
739         .moo_object_create = cmr_object_create,
740         .moo_ref_add       = cmr_ref_add,
741         .moo_ref_del       = cmr_ref_del,
742         .moo_open          = cmr_open,
743         .moo_close         = cmr_close,
744         .moo_readpage      = cmr_readpage,
745         .moo_readlink      = cmr_readlink,
746         .moo_capa_get      = cmr_capa_get
747 };
748
749 /* remote part of md_dir operations */
750 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
751                       const char *name, struct lu_fid *lf)
752 {
753         /*
754          * This can happens while rename() If new parent is remote dir, lookup
755          * will happen here.
756          */
757
758         RETURN(-EREMOTE);
759 }
760
761 /*
762  * All methods below are cross-ref by nature. They consist of remote call and
763  * local operation. Due to future rollback functionality there are several
764  * limitations for such methods:
765  * 1) remote call should be done at first to do epoch negotiation between all
766  * MDS involved and to avoid the RPC inside transaction.
767  * 2) only one RPC can be sent - also due to epoch negotiation.
768  * For more details see rollback HLD/DLD.
769  *
770  */
771 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
772                       const char *child_name, struct md_object *mo_c,
773                       const struct md_create_spec *spec,
774                       struct md_attr *ma)
775 {
776         struct cmm_thread_info *cmi;
777         struct md_attr *tmp_ma;
778         int rc;
779
780         ENTRY;
781         /* check the SGID attr */
782         cmi = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
783         LASSERT(cmi);
784         tmp_ma = &cmi->cmi_ma;
785         tmp_ma->ma_need = MA_INODE;
786         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
787         if (rc)
788                 RETURN(rc);
789
790         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
791                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
792                 if (S_ISDIR(ma->ma_attr.la_mode)) {
793                         ma->ma_attr.la_mode |= S_ISGID;
794                         ma->ma_attr.la_valid |= LA_MODE;
795                 }
796         }
797         /* remote object creation and local name insert */
798         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
799         if (rc == 0) {
800                 rc = mdo_name_insert(env, md_object_next(mo_p),
801                                      child_name, lu_object_fid(&mo_c->mo_lu),
802                                      S_ISDIR(ma->ma_attr.la_mode));
803         }
804
805         RETURN(rc);
806 }
807
808 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
809                     struct md_object *mo_s, const char *name,
810                     struct md_attr *ma)
811 {
812         int rc;
813         ENTRY;
814
815         //XXX: make sure that MDT checks name isn't exist
816
817         rc = mo_ref_add(env, md_object_next(mo_s));
818         if (rc == 0) {
819                 rc = mdo_name_insert(env, md_object_next(mo_p),
820                                      name, lu_object_fid(&mo_s->mo_lu), 0);
821         }
822
823         RETURN(rc);
824 }
825
826 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
827                       struct md_object *mo_c, const char *name,
828                       struct md_attr *ma)
829 {
830         int rc;
831         ENTRY;
832
833         rc = mo_ref_del(env, md_object_next(mo_c), ma);
834         if (rc == 0) {
835                 rc = mdo_name_remove(env, md_object_next(mo_p), name);
836         }
837
838         RETURN(rc);
839 }
840
841 static int cmr_rename(const struct lu_env *env,
842                       struct md_object *mo_po, struct md_object *mo_pn,
843                       const struct lu_fid *lf, const char *s_name,
844                       struct md_object *mo_t, const char *t_name,
845                       struct md_attr *ma)
846 {
847         int rc;
848         ENTRY;
849
850         /* get real type of src */
851         rc = __cmm_mode_get(env, md_obj2dev(mo_po), lf, ma);
852         if (rc != 0)
853                 RETURN(rc);
854
855         LASSERT(mo_t == NULL);
856         /* the mo_pn is remote directory, so we cannot even know if there is
857          * mo_t or not. Therefore mo_t is NULL here but remote server should do
858          * lookup and process this further */
859         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
860                             NULL/* mo_t */, lf, t_name, ma);
861         /* only old name is removed localy */
862         if (rc == 0)
863                 rc = mdo_name_remove(env, md_object_next(mo_po),
864                                      s_name);
865
866         RETURN(rc);
867 }
868
869 /* part of cross-ref rename(). Used to insert new name in new parent
870  * and unlink target with same name if it exists */
871 static int cmr_rename_tgt(const struct lu_env *env,
872                           struct md_object *mo_p, struct md_object *mo_t,
873                           const struct lu_fid *lf, const char *name,
874                           struct md_attr *ma)
875 {
876         int rc;
877         ENTRY;
878         /* target object is remote one */
879         rc = mo_ref_del(env, md_object_next(mo_t), ma);
880         /* continue locally with name handling only */
881         if (rc == 0)
882                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
883                                     NULL, lf, name, ma);
884         RETURN(rc);
885 }
886
887 static struct md_dir_operations cmr_dir_ops = {
888         .mdo_is_subdir   = cmm_is_subdir,
889         .mdo_lookup      = cmr_lookup,
890         .mdo_create      = cmr_create,
891         .mdo_link        = cmr_link,
892         .mdo_unlink      = cmr_unlink,
893         .mdo_rename      = cmr_rename,
894         .mdo_rename_tgt  = cmr_rename_tgt,
895 };