Whamcloud - gitweb
A couple of trivial spelling fixes.
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <lustre_fid.h>
50 #include "cmm_internal.h"
51 #include "mdc_internal.h"
52
53 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
54                    mdsno_t *mds, const struct lu_env *env)
55 {
56         int rc = 0;
57         ENTRY;
58
59         LASSERT(fid_is_sane(fid));
60
61         rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
62         if (rc) {
63                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
64                        fid_seq(fid), rc);
65                 RETURN(rc);
66         }
67
68         if (*mds > cm->cmm_tgt_count) {
69                 CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
70                        *mds, cm->cmm_tgt_count);
71                 rc = -EINVAL;
72         } else {
73                 CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
74                        LPU64"\n", *mds, fid_seq(fid));
75         }
76
77         RETURN (rc);
78 }
79
80 static struct md_object_operations cml_mo_ops;
81 static struct md_dir_operations    cml_dir_ops;
82 static struct lu_object_operations cml_obj_ops;
83
84 static struct md_object_operations cmr_mo_ops;
85 static struct md_dir_operations    cmr_dir_ops;
86 static struct lu_object_operations cmr_obj_ops;
87
88 struct lu_object *cmm_object_alloc(const struct lu_env *env,
89                                    const struct lu_object_header *loh,
90                                    struct lu_device *ld)
91 {
92         const struct lu_fid *fid = &loh->loh_fid;
93         struct lu_object  *lo = NULL;
94         struct cmm_device *cd;
95         mdsno_t mds;
96         int rc = 0;
97
98         ENTRY;
99
100         cd = lu2cmm_dev(ld);
101         if (cd->cmm_flags & CMM_INITIALIZED) {
102                 /* get object location */
103                 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
104                 if (rc)
105                         RETURN(NULL);
106         } else
107                 /*
108                  * Device is not yet initialized, cmm_object is being created
109                  * as part of early bootstrap procedure (it is /ROOT, or /fld,
110                  * etc.). Such object *has* to be local.
111                  */
112                 mds = cd->cmm_local_num;
113
114         /* select the proper set of operations based on object location */
115         if (mds == cd->cmm_local_num) {
116                 struct cml_object *clo;
117
118                 OBD_ALLOC_PTR(clo);
119                 if (clo != NULL) {
120                         lo = &clo->cmm_obj.cmo_obj.mo_lu;
121                         lu_object_init(lo, NULL, ld);
122                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
123                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
124                         lo->lo_ops = &cml_obj_ops;
125                 }
126         } else {
127                 struct cmr_object *cro;
128
129                 OBD_ALLOC_PTR(cro);
130                 if (cro != NULL) {
131                         lo = &cro->cmm_obj.cmo_obj.mo_lu;
132                         lu_object_init(lo, NULL, ld);
133                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
134                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
135                         lo->lo_ops = &cmr_obj_ops;
136                         cro->cmo_num = mds;
137                 }
138         }
139         RETURN(lo);
140 }
141
142 /*
143  * CMM has two types of objects - local and remote. They have different set
144  * of operations so we are avoiding multiple checks in code.
145  */
146
147 /* get local child device */
148 static struct lu_device *cml_child_dev(struct cmm_device *d)
149 {
150         return &d->cmm_child->md_lu_dev;
151 }
152
153 /* lu_object operations */
154 static void cml_object_free(const struct lu_env *env,
155                             struct lu_object *lo)
156 {
157         struct cml_object *clo = lu2cml_obj(lo);
158         lu_object_fini(lo);
159         OBD_FREE_PTR(clo);
160 }
161
162 static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
163 {
164         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
165         struct lu_device  *c_dev;
166         struct lu_object  *c_obj;
167         int rc;
168
169         ENTRY;
170
171 #ifdef HAVE_SPLIT_SUPPORT
172         if (cd->cmm_tgt_count == 0)
173                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
174         else
175                 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
176 #endif
177         c_dev = cml_child_dev(cd);
178         if (c_dev == NULL) {
179                 rc = -ENOENT;
180         } else {
181                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
182                                                         lo->lo_header, c_dev);
183                 if (c_obj != NULL) {
184                         lu_object_add(lo, c_obj);
185                         rc = 0;
186                 } else {
187                         rc = -ENOMEM;
188                 }
189         }
190
191         RETURN(rc);
192 }
193
194 static int cml_object_print(const struct lu_env *env, void *cookie,
195                             lu_printer_t p, const struct lu_object *lo)
196 {
197         return (*p)(env, cookie, LUSTRE_CMM_NAME"-local@%p", lo);
198 }
199
200 static struct lu_object_operations cml_obj_ops = {
201         .loo_object_init    = cml_object_init,
202         .loo_object_free    = cml_object_free,
203         .loo_object_print   = cml_object_print
204 };
205
206 /* CMM local md_object operations */
207 static int cml_object_create(const struct lu_env *env,
208                              struct md_object *mo,
209                              const struct md_op_spec *spec,
210                              struct md_attr *attr)
211 {
212         int rc;
213         ENTRY;
214         rc = mo_object_create(env, md_object_next(mo), spec, attr);
215         RETURN(rc);
216 }
217
218 static int cml_permission(const struct lu_env *env,
219                           struct md_object *p, struct md_object *c,
220                           struct md_attr *attr, int mask)
221 {
222         int rc;
223         ENTRY;
224         rc = mo_permission(env, md_object_next(p), md_object_next(c),
225                            attr, mask);
226         RETURN(rc);
227 }
228
229 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
230                         struct md_attr *attr)
231 {
232         int rc;
233         ENTRY;
234         rc = mo_attr_get(env, md_object_next(mo), attr);
235         RETURN(rc);
236 }
237
238 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
239                         const struct md_attr *attr)
240 {
241         int rc;
242         ENTRY;
243         rc = mo_attr_set(env, md_object_next(mo), attr);
244         RETURN(rc);
245 }
246
247 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
248                          struct lu_buf *buf, const char *name)
249 {
250         int rc;
251         ENTRY;
252         rc = mo_xattr_get(env, md_object_next(mo), buf, name);
253         RETURN(rc);
254 }
255
256 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
257                         struct lu_buf *buf)
258 {
259         int rc;
260         ENTRY;
261         rc = mo_readlink(env, md_object_next(mo), buf);
262         RETURN(rc);
263 }
264
265 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
266                           struct lu_buf *buf)
267 {
268         int rc;
269         ENTRY;
270         rc = mo_xattr_list(env, md_object_next(mo), buf);
271         RETURN(rc);
272 }
273
274 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
275                          const struct lu_buf *buf, const char *name,
276                          int fl)
277 {
278         int rc;
279         ENTRY;
280         rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
281         RETURN(rc);
282 }
283
284 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
285                          const char *name)
286 {
287         int rc;
288         ENTRY;
289         rc = mo_xattr_del(env, md_object_next(mo), name);
290         RETURN(rc);
291 }
292
293 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
294                        const struct md_attr *ma)
295 {
296         int rc;
297         ENTRY;
298         rc = mo_ref_add(env, md_object_next(mo), ma);
299         RETURN(rc);
300 }
301
302 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
303                        struct md_attr *ma)
304 {
305         int rc;
306         ENTRY;
307         rc = mo_ref_del(env, md_object_next(mo), ma);
308         RETURN(rc);
309 }
310
311 static int cml_open(const struct lu_env *env, struct md_object *mo,
312                     int flags)
313 {
314         int rc;
315         ENTRY;
316         rc = mo_open(env, md_object_next(mo), flags);
317         RETURN(rc);
318 }
319
320 static int cml_close(const struct lu_env *env, struct md_object *mo,
321                      struct md_attr *ma)
322 {
323         int rc;
324         ENTRY;
325         rc = mo_close(env, md_object_next(mo), ma);
326         RETURN(rc);
327 }
328
329 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
330                         const struct lu_rdpg *rdpg)
331 {
332         int rc;
333         ENTRY;
334         rc = mo_readpage(env, md_object_next(mo), rdpg);
335         RETURN(rc);
336 }
337
338 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
339                         struct lustre_capa *capa, int renewal)
340 {
341         int rc;
342         ENTRY;
343         rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
344         RETURN(rc);
345 }
346
347 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
348 {
349         int rc;
350         ENTRY;
351         rc = mo_object_sync(env, md_object_next(mo));
352         RETURN(rc);
353 }
354
355 static struct md_object_operations cml_mo_ops = {
356         .moo_permission    = cml_permission,
357         .moo_attr_get      = cml_attr_get,
358         .moo_attr_set      = cml_attr_set,
359         .moo_xattr_get     = cml_xattr_get,
360         .moo_xattr_list    = cml_xattr_list,
361         .moo_xattr_set     = cml_xattr_set,
362         .moo_xattr_del     = cml_xattr_del,
363         .moo_object_create = cml_object_create,
364         .moo_ref_add       = cml_ref_add,
365         .moo_ref_del       = cml_ref_del,
366         .moo_open          = cml_open,
367         .moo_close         = cml_close,
368         .moo_readpage      = cml_readpage,
369         .moo_readlink      = cml_readlink,
370         .moo_capa_get      = cml_capa_get,
371         .moo_object_sync   = cml_object_sync,
372 };
373
374 /* md_dir operations */
375 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
376                       const struct lu_name *lname, struct lu_fid *lf,
377                       struct md_op_spec *spec)
378 {
379         int rc;
380         ENTRY;
381
382 #ifdef HAVE_SPLIT_SUPPORT
383         if (spec != NULL && spec->sp_ck_split) {
384                 rc = cmm_split_check(env, mo_p, lname->ln_name);
385                 if (rc)
386                         RETURN(rc);
387         }
388 #endif
389         rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
390         RETURN(rc);
391
392 }
393
394 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
395                                 struct md_object *mo, mdl_mode_t lm)
396 {
397         int rc = MDL_MINMODE;
398         ENTRY;
399
400 #ifdef HAVE_SPLIT_SUPPORT
401         rc = cmm_split_access(env, mo, lm);
402 #endif
403
404         RETURN(rc);
405 }
406
407 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
408                       const struct lu_name *lname, struct md_object *mo_c,
409                       struct md_op_spec *spec, struct md_attr *ma)
410 {
411         int rc;
412         ENTRY;
413
414 #ifdef HAVE_SPLIT_SUPPORT
415         /* Lock mode always should be sane. */
416         LASSERT(spec->sp_cr_mode != MDL_MINMODE);
417
418         /*
419          * Sigh... This is long story. MDT may have race with detecting if split
420          * is possible in cmm. We know this race and let it live, because
421          * getting it rid (with some sem or spinlock) will also mean that
422          * PDIROPS for create will not work because we kill parallel work, what
423          * is really bad for performance and makes no sense having PDIROPS. So,
424          * we better allow the race to live, but split dir only if some of
425          * concurrent threads takes EX lock, not matter which one. So that, say,
426          * two concurrent threads may have different lock modes on directory (CW
427          * and EX) and not first one which comes here and see that split is
428          * possible should split the dir, but only that one which has EX
429          * lock. And we do not care that in this case, split may happen a bit
430          * later (when dir size will not be necessarily 64K, but may be a bit
431          * larger). So that, we allow concurrent creates and protect split by EX
432          * lock.
433          */
434         if (spec->sp_cr_mode == MDL_EX) {
435                 /*
436                  * Try to split @mo_p. If split is ok, -ERESTART is returned and
437                  * current thread will not peoceed with create. Instead it sends
438                  * -ERESTART to client to let it know that correct MDT should be
439                  * chosen.
440                  */
441                 rc = cmm_split_dir(env, mo_p);
442                 if (rc)
443                         /*
444                          * -ERESTART or some split error is returned, we can't
445                          * proceed with create.
446                          */
447                         GOTO(out, rc);
448         }
449
450         if (spec != NULL && spec->sp_ck_split) {
451                 /*
452                  * Check for possible split directory and let caller know that
453                  * it should tell client that directory is split and operation
454                  * should repeat to correct MDT.
455                  */
456                 rc = cmm_split_check(env, mo_p, lname->ln_name);
457                 if (rc)
458                         GOTO(out, rc);
459         }
460 #endif
461
462         rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
463                         spec, ma);
464
465         EXIT;
466 #ifdef HAVE_SPLIT_SUPPORT
467 out:
468 #endif
469         return rc;
470 }
471
472 static int cml_create_data(const struct lu_env *env, struct md_object *p,
473                            struct md_object *o,
474                            const struct md_op_spec *spec,
475                            struct md_attr *ma)
476 {
477         int rc;
478         ENTRY;
479         rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
480                              spec, ma);
481         RETURN(rc);
482 }
483
484 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
485                     struct md_object *mo_s, const struct lu_name *lname,
486                     struct md_attr *ma)
487 {
488         int rc;
489         ENTRY;
490         rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
491                       lname, ma);
492         RETURN(rc);
493 }
494
495 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
496                       struct md_object *mo_c, const struct lu_name *lname,
497                       struct md_attr *ma)
498 {
499         int rc;
500         ENTRY;
501         rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
502                         lname, ma);
503         RETURN(rc);
504 }
505
506 /* rename is split to local/remote by location of new parent dir */
507 struct md_object *md_object_find(const struct lu_env *env,
508                                  struct md_device *md,
509                                  const struct lu_fid *f)
510 {
511         struct lu_object *o;
512         struct md_object *m;
513         ENTRY;
514
515         o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
516         if (IS_ERR(o))
517                 m = (struct md_object *)o;
518         else {
519                 o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
520                 m = o ? lu2md(o) : NULL;
521         }
522         RETURN(m);
523 }
524
525 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
526                         const struct lu_fid *lf, struct md_attr *ma,
527                         int *remote)
528 {
529         struct md_object *mo_s = md_object_find(env, md, lf);
530         struct cmm_thread_info *cmi;
531         struct md_attr *tmp_ma;
532         int rc;
533         ENTRY;
534
535         if (IS_ERR(mo_s))
536                 RETURN(PTR_ERR(mo_s));
537
538         if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
539                 *remote = 1;
540
541         cmi = cmm_env_info(env);
542         tmp_ma = &cmi->cmi_ma;
543         tmp_ma->ma_need = MA_INODE;
544         tmp_ma->ma_valid = 0;
545         /* get type from src, can be remote req */
546         rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
547         if (rc == 0) {
548                 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
549                 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
550                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
551                 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
552                 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
553         }
554         lu_object_put(env, &mo_s->mo_lu);
555         RETURN(rc);
556 }
557
558 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
559                             const struct lu_fid *lf, struct md_attr *ma)
560 {
561         struct md_object *mo_s = md_object_find(env, md, lf);
562         int rc;
563         ENTRY;
564
565         if (IS_ERR(mo_s))
566                 RETURN(PTR_ERR(mo_s));
567
568         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
569         /* set ctime to obj, can be remote req */
570         rc = mo_attr_set(env, md_object_next(mo_s), ma);
571         lu_object_put(env, &mo_s->mo_lu);
572         RETURN(rc);
573 }
574
575 static inline void cml_rename_warn(const char *fname,
576                                   struct md_object *mo_po,
577                                   struct md_object *mo_pn,
578                                   const struct lu_fid *lf,
579                                   const char *s_name,
580                                   struct md_object *mo_t,
581                                   const char *t_name,
582                                   int err)
583 {
584         if (mo_t)
585                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
586                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
587                       "[tname %s] [err %d]\n", fname,
588                       PFID(lu_object_fid(&mo_po->mo_lu)),
589                       PFID(lu_object_fid(&mo_pn->mo_lu)),
590                       PFID(lf), s_name,
591                       PFID(lu_object_fid(&mo_t->mo_lu)),
592                       t_name, err);
593         else
594                 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
595                       "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
596                       "[tname %s] [err %d]\n", fname,
597                       PFID(lu_object_fid(&mo_po->mo_lu)),
598                       PFID(lu_object_fid(&mo_pn->mo_lu)),
599                       PFID(lf), s_name,
600                       t_name, err);
601 }
602
603 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
604                       struct md_object *mo_pn, const struct lu_fid *lf,
605                       const struct lu_name *ls_name, struct md_object *mo_t,
606                       const struct lu_name *lt_name, struct md_attr *ma)
607 {
608         struct cmm_thread_info *cmi;
609         struct md_attr *tmp_ma = NULL;
610         struct md_object *tmp_t = mo_t;
611         int remote = 0, rc;
612         ENTRY;
613
614         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
615         if (rc)
616                 RETURN(rc);
617
618         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
619                 /* XXX: mo_t is remote object and there is RPC to unlink it.
620                  * before that, do local sanity check for rename first. */
621                 if (!remote) {
622                         struct md_object *mo_s = md_object_find(env,
623                                                         md_obj2dev(mo_po), lf);
624                         if (IS_ERR(mo_s))
625                                 RETURN(PTR_ERR(mo_s));
626
627                         LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
628                         rc = mo_permission(env, md_object_next(mo_po),
629                                            md_object_next(mo_s),
630                                            ma, MAY_RENAME_SRC);
631                         lu_object_put(env, &mo_s->mo_lu);
632                         if (rc)
633                                 RETURN(rc);
634                 } else {
635                         rc = mo_permission(env, NULL, md_object_next(mo_po),
636                                            ma, MAY_UNLINK | MAY_VTX_FULL);
637                         if (rc)
638                                 RETURN(rc);
639                 }
640
641                 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
642                                    MAY_UNLINK | MAY_VTX_PART);
643                 if (rc)
644                         RETURN(rc);
645
646                 /*
647                  * XXX: @ma will be changed after mo_ref_del, but we will use
648                  * it for mdo_rename later, so save it before mo_ref_del.
649                  */
650                 cmi = cmm_env_info(env);
651                 tmp_ma = &cmi->cmi_ma;
652                 *tmp_ma = *ma;
653                 rc = mo_ref_del(env, md_object_next(mo_t), ma);
654                 if (rc)
655                         RETURN(rc);
656
657                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
658                 mo_t = NULL;
659         }
660
661         /* XXX: for src on remote MDS case, change its ctime before local
662          * rename. Firstly, do local sanity check for rename if necessary. */
663         if (remote) {
664                 if (!tmp_ma) {
665                         rc = mo_permission(env, NULL, md_object_next(mo_po),
666                                            ma, MAY_UNLINK | MAY_VTX_FULL);
667                         if (rc)
668                                 RETURN(rc);
669
670                         if (mo_t) {
671                                 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
672                                 rc = mo_permission(env, md_object_next(mo_pn),
673                                                    md_object_next(mo_t),
674                                                    ma, MAY_RENAME_TAR);
675                                 if (rc)
676                                         RETURN(rc);
677                         } else {
678                                 int mask;
679
680                                 if (mo_po != mo_pn)
681                                         mask = (S_ISDIR(ma->ma_attr.la_mode) ?
682                                                 MAY_LINK : MAY_CREATE);
683                                 else
684                                         mask = MAY_CREATE;
685                                 rc = mo_permission(env, NULL,
686                                                    md_object_next(mo_pn),
687                                                    NULL, mask);
688                                 if (rc)
689                                         RETURN(rc);
690                         }
691
692                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
693                 } else {
694                         LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
695                 }
696
697                 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
698                                       tmp_ma ? tmp_ma : ma);
699                 if (rc) {
700                         /* TODO: revoke mo_t if necessary. */
701                         cml_rename_warn("cmm_rename_ctime", mo_po,
702                                         mo_pn, lf, ls_name->ln_name,
703                                         tmp_t, lt_name->ln_name, rc);
704                         RETURN(rc);
705                 }
706         }
707
708         /* local rename, mo_t can be NULL */
709         rc = mdo_rename(env, md_object_next(mo_po),
710                         md_object_next(mo_pn), lf, ls_name,
711                         md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
712         if (rc)
713                 /* TODO: revoke all cml_rename */
714                 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
715                                 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
716
717         RETURN(rc);
718 }
719
720 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
721                           struct md_object *mo_t, const struct lu_fid *lf,
722                           const struct lu_name *lname, struct md_attr *ma)
723 {
724         int rc;
725         ENTRY;
726
727         rc = mdo_rename_tgt(env, md_object_next(mo_p),
728                             md_object_next(mo_t), lf, lname, ma);
729         RETURN(rc);
730 }
731 /* used only in case of rename_tgt() when target is not exist */
732 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
733                            const struct lu_name *lname, const struct lu_fid *lf,
734                            const struct md_attr *ma)
735 {
736         int rc;
737         ENTRY;
738
739         rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
740
741         RETURN(rc);
742 }
743
744 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
745                          const struct lu_fid *fid, struct lu_fid *sfid)
746 {
747         struct cmm_thread_info *cmi;
748         int rc;
749         ENTRY;
750
751         cmi = cmm_env_info(env);
752         rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
753         if (rc)
754                 RETURN(rc);
755
756         if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
757                 RETURN(0);
758
759         rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
760         RETURN(rc);
761 }
762
763 static struct md_dir_operations cml_dir_ops = {
764         .mdo_is_subdir   = cmm_is_subdir,
765         .mdo_lookup      = cml_lookup,
766         .mdo_lock_mode   = cml_lock_mode,
767         .mdo_create      = cml_create,
768         .mdo_link        = cml_link,
769         .mdo_unlink      = cml_unlink,
770         .mdo_name_insert = cml_name_insert,
771         .mdo_rename      = cml_rename,
772         .mdo_rename_tgt  = cml_rename_tgt,
773         .mdo_create_data = cml_create_data
774 };
775
776 /* -------------------------------------------------------------------
777  * remote CMM object operations. cmr_...
778  */
779 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
780 {
781         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
782 }
783 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
784 {
785         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
786 }
787 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
788 {
789         return container_of0(co, struct cmr_object, cmm_obj);
790 }
791
792 /* get proper child device from MDCs */
793 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
794 {
795         struct lu_device *next = NULL;
796         struct mdc_device *mdc;
797
798         spin_lock(&d->cmm_tgt_guard);
799         list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
800                 if (mdc->mc_num == num) {
801                         next = mdc2lu_dev(mdc);
802                         break;
803                 }
804         }
805         spin_unlock(&d->cmm_tgt_guard);
806         return next;
807 }
808
809 /* lu_object operations */
810 static void cmr_object_free(const struct lu_env *env,
811                             struct lu_object *lo)
812 {
813         struct cmr_object *cro = lu2cmr_obj(lo);
814         lu_object_fini(lo);
815         OBD_FREE_PTR(cro);
816 }
817
818 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
819 {
820         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
821         struct lu_device  *c_dev;
822         struct lu_object  *c_obj;
823         int rc;
824
825         ENTRY;
826
827         c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
828         if (c_dev == NULL) {
829                 rc = -ENOENT;
830         } else {
831                 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
832                                                         lo->lo_header, c_dev);
833                 if (c_obj != NULL) {
834                         lu_object_add(lo, c_obj);
835                         rc = 0;
836                 } else {
837                         rc = -ENOMEM;
838                 }
839         }
840
841         RETURN(rc);
842 }
843
844 static int cmr_object_print(const struct lu_env *env, void *cookie,
845                             lu_printer_t p, const struct lu_object *lo)
846 {
847         return (*p)(env, cookie, LUSTRE_CMM_NAME"-remote@%p", lo);
848 }
849
850 static struct lu_object_operations cmr_obj_ops = {
851         .loo_object_init    = cmr_object_init,
852         .loo_object_free    = cmr_object_free,
853         .loo_object_print   = cmr_object_print
854 };
855
856 /* CMM remote md_object operations. All are invalid */
857 static int cmr_object_create(const struct lu_env *env,
858                              struct md_object *mo,
859                              const struct md_op_spec *spec,
860                              struct md_attr *ma)
861 {
862         return -EFAULT;
863 }
864
865 static int cmr_permission(const struct lu_env *env,
866                           struct md_object *p, struct md_object *c,
867                           struct md_attr *attr, int mask)
868 {
869         return -EREMOTE;
870 }
871
872 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
873                         struct md_attr *attr)
874 {
875         return -EREMOTE;
876 }
877
878 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
879                         const struct md_attr *attr)
880 {
881         return -EFAULT;
882 }
883
884 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
885                          struct lu_buf *buf, const char *name)
886 {
887         return -EFAULT;
888 }
889
890 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
891                         struct lu_buf *buf)
892 {
893         return -EFAULT;
894 }
895
896 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
897                           struct lu_buf *buf)
898 {
899         return -EFAULT;
900 }
901
902 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
903                          const struct lu_buf *buf, const char *name,
904                          int fl)
905 {
906         return -EFAULT;
907 }
908
909 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
910                          const char *name)
911 {
912         return -EFAULT;
913 }
914
915 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
916                        const struct md_attr *ma)
917 {
918         return -EFAULT;
919 }
920
921 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
922                        struct md_attr *ma)
923 {
924         return -EFAULT;
925 }
926
927 static int cmr_open(const struct lu_env *env, struct md_object *mo,
928                     int flags)
929 {
930         return -EREMOTE;
931 }
932
933 static int cmr_close(const struct lu_env *env, struct md_object *mo,
934                      struct md_attr *ma)
935 {
936         return -EFAULT;
937 }
938
939 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
940                         const struct lu_rdpg *rdpg)
941 {
942         return -EREMOTE;
943 }
944
945 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
946                         struct lustre_capa *capa, int renewal)
947 {
948         return -EFAULT;
949 }
950
951 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
952 {
953         return -EFAULT;
954 }
955
956 static struct md_object_operations cmr_mo_ops = {
957         .moo_permission    = cmr_permission,
958         .moo_attr_get      = cmr_attr_get,
959         .moo_attr_set      = cmr_attr_set,
960         .moo_xattr_get     = cmr_xattr_get,
961         .moo_xattr_set     = cmr_xattr_set,
962         .moo_xattr_list    = cmr_xattr_list,
963         .moo_xattr_del     = cmr_xattr_del,
964         .moo_object_create = cmr_object_create,
965         .moo_ref_add       = cmr_ref_add,
966         .moo_ref_del       = cmr_ref_del,
967         .moo_open          = cmr_open,
968         .moo_close         = cmr_close,
969         .moo_readpage      = cmr_readpage,
970         .moo_readlink      = cmr_readlink,
971         .moo_capa_get      = cmr_capa_get,
972         .moo_object_sync   = cmr_object_sync,
973 };
974
975 /* remote part of md_dir operations */
976 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
977                       const struct lu_name *lname, struct lu_fid *lf,
978                       struct md_op_spec *spec)
979 {
980         /*
981          * This can happens while rename() If new parent is remote dir, lookup
982          * will happen here.
983          */
984
985         return -EREMOTE;
986 }
987
988 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
989                                 struct md_object *mo, mdl_mode_t lm)
990 {
991         return MDL_MINMODE;
992 }
993
994 /*
995  * All methods below are cross-ref by nature. They consist of remote call and
996  * local operation. Due to future rollback functionality there are several
997  * limitations for such methods:
998  * 1) remote call should be done at first to do epoch negotiation between all
999  * MDS involved and to avoid the RPC inside transaction.
1000  * 2) only one RPC can be sent - also due to epoch negotiation.
1001  * For more details see rollback HLD/DLD.
1002  */
1003 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1004                       const struct lu_name *lchild_name, struct md_object *mo_c,
1005                       struct md_op_spec *spec,
1006                       struct md_attr *ma)
1007 {
1008         struct cmm_thread_info *cmi;
1009         struct md_attr *tmp_ma;
1010         int rc;
1011         ENTRY;
1012
1013         /* Make sure that name isn't exist before doing remote call. */
1014         rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1015                         &cmm_env_info(env)->cmi_fid, NULL);
1016         if (rc == 0)
1017                 RETURN(-EEXIST);
1018         else if (rc != -ENOENT)
1019                 RETURN(rc);
1020
1021         /* check the SGID attr */
1022         cmi = cmm_env_info(env);
1023         LASSERT(cmi);
1024         tmp_ma = &cmi->cmi_ma;
1025         tmp_ma->ma_valid = 0;
1026         tmp_ma->ma_need = MA_INODE;
1027
1028 #ifdef CONFIG_FS_POSIX_ACL
1029         if (!S_ISLNK(ma->ma_attr.la_mode)) {
1030                 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1031                 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1032                 tmp_ma->ma_need |= MA_ACL_DEF;
1033         }
1034 #endif
1035         rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1036         if (rc)
1037                 RETURN(rc);
1038
1039         if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1040                 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1041                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1042                         ma->ma_attr.la_mode |= S_ISGID;
1043                         ma->ma_attr.la_valid |= LA_MODE;
1044                 }
1045         }
1046
1047 #ifdef CONFIG_FS_POSIX_ACL
1048         if (tmp_ma->ma_valid & MA_ACL_DEF) {
1049                 spec->u.sp_ea.fid = spec->u.sp_pfid;
1050                 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1051                 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1052                 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1053         }
1054 #endif
1055
1056         /* Local permission check for name_insert before remote ops. */
1057         rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1058                            (S_ISDIR(ma->ma_attr.la_mode) ?
1059                            MAY_LINK : MAY_CREATE));
1060         if (rc)
1061                 RETURN(rc);
1062
1063         /* Remote object creation and local name insert. */
1064         /*
1065          * XXX: @ma will be changed after mo_object_create, but we will use
1066          * it for mdo_name_insert later, so save it before mo_object_create.
1067          */
1068         *tmp_ma = *ma;
1069         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1070         if (rc == 0) {
1071                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1072                 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1073                                      lu_object_fid(&mo_c->mo_lu), tmp_ma);
1074                 if (unlikely(rc)) {
1075                         /* TODO: remove object mo_c on remote MDS */
1076                         CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1077                               " [name %s] [mo_c "DFID"] [err %d]\n",
1078                               PFID(lu_object_fid(&mo_p->mo_lu)),
1079                               lchild_name->ln_name,
1080                               PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1081                 }
1082         }
1083
1084         RETURN(rc);
1085 }
1086
1087 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1088                     struct md_object *mo_s, const struct lu_name *lname,
1089                     struct md_attr *ma)
1090 {
1091         int rc;
1092         ENTRY;
1093
1094         /* Make sure that name isn't exist before doing remote call. */
1095         rc = mdo_lookup(env, md_object_next(mo_p), lname,
1096                         &cmm_env_info(env)->cmi_fid, NULL);
1097         if (rc == 0) {
1098                 rc = -EEXIST;
1099         } else if (rc == -ENOENT) {
1100                 /* Local permission check for name_insert before remote ops. */
1101                 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1102                                    MAY_CREATE);
1103                 if (rc)
1104                         RETURN(rc);
1105
1106                 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1107                 if (rc == 0) {
1108                         ma->ma_attr_flags |= MDS_PERM_BYPASS;
1109                         rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1110                                              lu_object_fid(&mo_s->mo_lu), ma);
1111                         if (unlikely(rc)) {
1112                                 /* TODO: ref_del from mo_s on remote MDS */
1113                                 CWARN("cmr_link failed, should revoke: "
1114                                       "[mo_p "DFID"] [mo_s "DFID"] "
1115                                       "[name %s] [err %d]\n",
1116                                       PFID(lu_object_fid(&mo_p->mo_lu)),
1117                                       PFID(lu_object_fid(&mo_s->mo_lu)),
1118                                       lname->ln_name, rc);
1119                         }
1120                 }
1121         }
1122         RETURN(rc);
1123 }
1124
1125 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1126                       struct md_object *mo_c, const struct lu_name *lname,
1127                       struct md_attr *ma)
1128 {
1129         struct cmm_thread_info *cmi;
1130         struct md_attr *tmp_ma;
1131         int rc;
1132         ENTRY;
1133
1134         /* Local permission check for name_remove before remote ops. */
1135         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1136                            MAY_UNLINK | MAY_VTX_PART);
1137         if (rc)
1138                 RETURN(rc);
1139
1140         /*
1141          * XXX: @ma will be changed after mo_ref_del, but we will use
1142          * it for mdo_name_remove later, so save it before mo_ref_del.
1143          */
1144         cmi = cmm_env_info(env);
1145         tmp_ma = &cmi->cmi_ma;
1146         *tmp_ma = *ma;
1147         rc = mo_ref_del(env, md_object_next(mo_c), ma);
1148         if (rc == 0) {
1149                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1150                 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1151                 if (unlikely(rc)) {
1152                         /* TODO: ref_add to mo_c on remote MDS */
1153                         CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1154                               " [mo_c "DFID"] [name %s] [err %d]\n",
1155                               PFID(lu_object_fid(&mo_p->mo_lu)),
1156                               PFID(lu_object_fid(&mo_c->mo_lu)),
1157                               lname->ln_name, rc);
1158                 }
1159         }
1160
1161         RETURN(rc);
1162 }
1163
1164 static inline void cmr_rename_warn(const char *fname,
1165                                   struct md_object *mo_po,
1166                                   struct md_object *mo_pn,
1167                                   const struct lu_fid *lf,
1168                                   const char *s_name,
1169                                   const char *t_name,
1170                                   int err)
1171 {
1172         CWARN("cmr_rename failed for %s, should revoke: "
1173               "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1174               "[sname %s] [tname %s] [err %d]\n", fname,
1175               PFID(lu_object_fid(&mo_po->mo_lu)),
1176               PFID(lu_object_fid(&mo_pn->mo_lu)),
1177               PFID(lf), s_name, t_name, err);
1178 }
1179
1180 static int cmr_rename(const struct lu_env *env,
1181                       struct md_object *mo_po, struct md_object *mo_pn,
1182                       const struct lu_fid *lf, const struct lu_name *ls_name,
1183                       struct md_object *mo_t, const struct lu_name *lt_name,
1184                       struct md_attr *ma)
1185 {
1186         struct cmm_thread_info *cmi;
1187         struct md_attr *tmp_ma;
1188         int rc;
1189         ENTRY;
1190
1191         LASSERT(mo_t == NULL);
1192
1193         /* get real type of src */
1194         rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1195         if (rc)
1196                 RETURN(rc);
1197
1198         /* Local permission check for name_remove before remote ops. */
1199         rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1200                            MAY_UNLINK | MAY_VTX_FULL);
1201         if (rc)
1202                 RETURN(rc);
1203
1204         /*
1205          * XXX: @ma maybe changed after mdo_rename_tgt, but we will use it
1206          * for mdo_name_remove later, so save it before mdo_rename_tgt.
1207          */
1208         cmi = cmm_env_info(env);
1209         tmp_ma = &cmi->cmi_ma;
1210         *tmp_ma = *ma;
1211         /* the mo_pn is remote directory, so we cannot even know if there is
1212          * mo_t or not. Therefore mo_t is NULL here but remote server should do
1213          * lookup and process this further */
1214         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1215                             NULL/* mo_t */, lf, lt_name, ma);
1216         if (rc)
1217                 RETURN(rc);
1218
1219         tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1220
1221         /* src object maybe on remote MDS, do remote ops first. */
1222         rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1223         if (unlikely(rc)) {
1224                 /* TODO: revoke mdo_rename_tgt */
1225                 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1226                                 ls_name->ln_name, lt_name->ln_name, rc);
1227                 RETURN(rc);
1228         }
1229
1230         /* only old name is removed localy */
1231         rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1232         if (unlikely(rc))
1233                 /* TODO: revoke all cmr_rename */
1234                 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1235                                 ls_name->ln_name, lt_name->ln_name, rc);
1236
1237         RETURN(rc);
1238 }
1239
1240 /* part of cross-ref rename(). Used to insert new name in new parent
1241  * and unlink target */
1242 static int cmr_rename_tgt(const struct lu_env *env,
1243                           struct md_object *mo_p, struct md_object *mo_t,
1244                           const struct lu_fid *lf, const struct lu_name *lname,
1245                           struct md_attr *ma)
1246 {
1247         struct cmm_thread_info *cmi;
1248         struct md_attr *tmp_ma;
1249         int rc;
1250         ENTRY;
1251
1252         /* target object is remote one */
1253         /* Local permission check for rename_tgt before remote ops. */
1254         rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1255                            MAY_UNLINK | MAY_VTX_PART);
1256         if (rc)
1257                 RETURN(rc);
1258
1259         /*
1260          * XXX: @ma maybe changed after mo_ref_del, but we will use
1261          * it for mdo_rename_tgt later, so save it before mo_ref_del.
1262          */
1263         cmi = cmm_env_info(env);
1264         tmp_ma = &cmi->cmi_ma;
1265         *tmp_ma = *ma;
1266         rc = mo_ref_del(env, md_object_next(mo_t), ma);
1267         /* continue locally with name handling only */
1268         if (rc == 0) {
1269                 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1270                 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1271                                     NULL, lf, lname, tmp_ma);
1272                 if (unlikely(rc)) {
1273                         /* TODO: ref_add to mo_t on remote MDS */
1274                         CWARN("cmr_rename_tgt failed, should revoke: "
1275                               "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1276                               "[name %s] [err %d]\n",
1277                               PFID(lu_object_fid(&mo_p->mo_lu)),
1278                               PFID(lu_object_fid(&mo_t->mo_lu)),
1279                               PFID(lf),
1280                               lname->ln_name, rc);
1281                 }
1282         }
1283         RETURN(rc);
1284 }
1285
1286 static struct md_dir_operations cmr_dir_ops = {
1287         .mdo_is_subdir   = cmm_is_subdir,
1288         .mdo_lookup      = cmr_lookup,
1289         .mdo_lock_mode   = cmr_lock_mode,
1290         .mdo_create      = cmr_create,
1291         .mdo_link        = cmr_link,
1292         .mdo_unlink      = cmr_unlink,
1293         .mdo_rename      = cmr_rename,
1294         .mdo_rename_tgt  = cmr_rename_tgt,
1295 };