1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/cmm/cmm_object.c
5 * Lustre Cluster Metadata Manager (cmm)
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Mike Pershin <tappro@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
33 #define DEBUG_SUBSYSTEM S_MDS
35 #include <lustre_fid.h>
36 #include "cmm_internal.h"
37 #include "mdc_internal.h"
39 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
40 mdsno_t *mds, const struct lu_env *env)
45 LASSERT(fid_is_sane(fid));
47 rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
49 CERROR("Can't find mds by seq "LPX64", rc %d\n",
54 if (*mds > cm->cmm_tgt_count) {
55 CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
56 *mds, cm->cmm_tgt_count);
59 CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
60 LPU64"\n", *mds, fid_seq(fid));
66 static struct md_object_operations cml_mo_ops;
67 static struct md_dir_operations cml_dir_ops;
68 static struct lu_object_operations cml_obj_ops;
70 static struct md_object_operations cmr_mo_ops;
71 static struct md_dir_operations cmr_dir_ops;
72 static struct lu_object_operations cmr_obj_ops;
74 struct lu_object *cmm_object_alloc(const struct lu_env *env,
75 const struct lu_object_header *loh,
78 const struct lu_fid *fid = &loh->loh_fid;
79 struct lu_object *lo = NULL;
80 struct cmm_device *cd;
87 if (cd->cmm_flags & CMM_INITIALIZED) {
88 /* get object location */
89 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
94 * Device is not yet initialized, cmm_object is being created
95 * as part of early bootstrap procedure (it is /ROOT, or /fld,
96 * etc.). Such object *has* to be local.
98 mds = cd->cmm_local_num;
100 /* select the proper set of operations based on object location */
101 if (mds == cd->cmm_local_num) {
102 struct cml_object *clo;
106 lo = &clo->cmm_obj.cmo_obj.mo_lu;
107 lu_object_init(lo, NULL, ld);
108 clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
109 clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
110 lo->lo_ops = &cml_obj_ops;
113 struct cmr_object *cro;
117 lo = &cro->cmm_obj.cmo_obj.mo_lu;
118 lu_object_init(lo, NULL, ld);
119 cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
120 cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
121 lo->lo_ops = &cmr_obj_ops;
129 * CMM has two types of objects - local and remote. They have different set
130 * of operations so we are avoiding multiple checks in code.
133 /* get local child device */
134 static struct lu_device *cml_child_dev(struct cmm_device *d)
136 return &d->cmm_child->md_lu_dev;
139 /* lu_object operations */
140 static void cml_object_free(const struct lu_env *env,
141 struct lu_object *lo)
143 struct cml_object *clo = lu2cml_obj(lo);
148 static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
150 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
151 struct lu_device *c_dev;
152 struct lu_object *c_obj;
157 #ifdef HAVE_SPLIT_SUPPORT
158 if (cd->cmm_tgt_count == 0)
159 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
161 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
163 c_dev = cml_child_dev(cd);
167 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
168 lo->lo_header, c_dev);
170 lu_object_add(lo, c_obj);
180 static int cml_object_print(const struct lu_env *env, void *cookie,
181 lu_printer_t p, const struct lu_object *lo)
183 return (*p)(env, cookie, LUSTRE_CMM_NAME"-local@%p", lo);
186 static struct lu_object_operations cml_obj_ops = {
187 .loo_object_init = cml_object_init,
188 .loo_object_free = cml_object_free,
189 .loo_object_print = cml_object_print
192 /* CMM local md_object operations */
193 static int cml_object_create(const struct lu_env *env,
194 struct md_object *mo,
195 const struct md_op_spec *spec,
196 struct md_attr *attr)
200 rc = mo_object_create(env, md_object_next(mo), spec, attr);
204 static int cml_permission(const struct lu_env *env,
205 struct md_object *p, struct md_object *c,
206 struct md_attr *attr, int mask)
210 rc = mo_permission(env, md_object_next(p), md_object_next(c),
215 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
216 struct md_attr *attr)
220 rc = mo_attr_get(env, md_object_next(mo), attr);
224 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
225 const struct md_attr *attr)
229 rc = mo_attr_set(env, md_object_next(mo), attr);
233 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
234 struct lu_buf *buf, const char *name)
238 rc = mo_xattr_get(env, md_object_next(mo), buf, name);
242 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
247 rc = mo_readlink(env, md_object_next(mo), buf);
251 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
256 rc = mo_xattr_list(env, md_object_next(mo), buf);
260 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
261 const struct lu_buf *buf,
262 const char *name, int fl)
266 rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
270 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
275 rc = mo_xattr_del(env, md_object_next(mo), name);
279 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
280 const struct md_attr *ma)
284 rc = mo_ref_add(env, md_object_next(mo), ma);
288 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
293 rc = mo_ref_del(env, md_object_next(mo), ma);
297 static int cml_open(const struct lu_env *env, struct md_object *mo,
302 rc = mo_open(env, md_object_next(mo), flags);
306 static int cml_close(const struct lu_env *env, struct md_object *mo,
311 rc = mo_close(env, md_object_next(mo), ma);
315 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
316 const struct lu_rdpg *rdpg)
320 rc = mo_readpage(env, md_object_next(mo), rdpg);
324 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
325 struct lustre_capa *capa, int renewal)
329 rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
333 static struct md_object_operations cml_mo_ops = {
334 .moo_permission = cml_permission,
335 .moo_attr_get = cml_attr_get,
336 .moo_attr_set = cml_attr_set,
337 .moo_xattr_get = cml_xattr_get,
338 .moo_xattr_list = cml_xattr_list,
339 .moo_xattr_set = cml_xattr_set,
340 .moo_xattr_del = cml_xattr_del,
341 .moo_object_create = cml_object_create,
342 .moo_ref_add = cml_ref_add,
343 .moo_ref_del = cml_ref_del,
344 .moo_open = cml_open,
345 .moo_close = cml_close,
346 .moo_readpage = cml_readpage,
347 .moo_readlink = cml_readlink,
348 .moo_capa_get = cml_capa_get
351 /* md_dir operations */
352 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
353 const struct lu_name *lname, struct lu_fid *lf,
354 struct md_op_spec *spec)
359 #ifdef HAVE_SPLIT_SUPPORT
360 if (spec != NULL && spec->sp_ck_split) {
361 rc = cmm_split_check(env, mo_p, lname->ln_name);
366 rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
371 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
372 struct md_object *mo, mdl_mode_t lm)
374 int rc = MDL_MINMODE;
377 #ifdef HAVE_SPLIT_SUPPORT
378 rc = cmm_split_access(env, mo, lm);
384 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
385 const struct lu_name *lname, struct md_object *mo_c,
386 struct md_op_spec *spec, struct md_attr *ma)
391 #ifdef HAVE_SPLIT_SUPPORT
392 /* Lock mode always should be sane. */
393 LASSERT(spec->sp_cr_mode != MDL_MINMODE);
396 * Sigh... This is long story. MDT may have race with detecting if split
397 * is possible in cmm. We know this race and let it live, because
398 * getting it rid (with some sem or spinlock) will also mean that
399 * PDIROPS for create will not work because we kill parallel work, what
400 * is really bad for performance and makes no sense having PDIROPS. So,
401 * we better allow the race to live, but split dir only if some of
402 * concurrent threads takes EX lock, not matter which one. So that, say,
403 * two concurrent threads may have different lock modes on directory (CW
404 * and EX) and not first one which comes here and see that split is
405 * possible should split the dir, but only that one which has EX
406 * lock. And we do not care that in this case, split may happen a bit
407 * later (when dir size will not be necessarily 64K, but may be a bit
408 * larger). So that, we allow concurrent creates and protect split by EX
411 if (spec->sp_cr_mode == MDL_EX) {
413 * Try to split @mo_p. If split is ok, -ERESTART is returned and
414 * current thread will not peoceed with create. Instead it sends
415 * -ERESTART to client to let it know that correct MDT should be
418 rc = cmm_split_dir(env, mo_p);
421 * -ERESTART or some split error is returned, we can't
422 * proceed with create.
427 if (spec != NULL && spec->sp_ck_split) {
429 * Check for possible split directory and let caller know that
430 * it should tell client that directory is split and operation
431 * should repeat to correct MDT.
433 rc = cmm_split_check(env, mo_p, lname->ln_name);
439 rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
443 #ifdef HAVE_SPLIT_SUPPORT
449 static int cml_create_data(const struct lu_env *env, struct md_object *p,
451 const struct md_op_spec *spec,
456 rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
461 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
462 struct md_object *mo_s, const struct lu_name *lname,
467 rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
472 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
473 struct md_object *mo_c, const struct lu_name *lname,
478 rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
483 /* rename is split to local/remote by location of new parent dir */
484 struct md_object *md_object_find(const struct lu_env *env,
485 struct md_device *md,
486 const struct lu_fid *f)
492 o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
494 m = (struct md_object *)o;
496 o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
497 m = o ? lu2md(o) : NULL;
502 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
503 const struct lu_fid *lf, struct md_attr *ma,
506 struct md_object *mo_s = md_object_find(env, md, lf);
507 struct cmm_thread_info *cmi;
508 struct md_attr *tmp_ma;
513 RETURN(PTR_ERR(mo_s));
515 if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
518 cmi = cmm_env_info(env);
519 tmp_ma = &cmi->cmi_ma;
520 tmp_ma->ma_need = MA_INODE;
521 tmp_ma->ma_valid = 0;
522 /* get type from src, can be remote req */
523 rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
525 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
526 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
527 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
528 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
529 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
531 lu_object_put(env, &mo_s->mo_lu);
535 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
536 const struct lu_fid *lf, struct md_attr *ma)
538 struct md_object *mo_s = md_object_find(env, md, lf);
543 RETURN(PTR_ERR(mo_s));
545 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
546 /* set ctime to obj, can be remote req */
547 rc = mo_attr_set(env, md_object_next(mo_s), ma);
548 lu_object_put(env, &mo_s->mo_lu);
552 static inline void cml_rename_warn(const char *fname,
553 struct md_object *mo_po,
554 struct md_object *mo_pn,
555 const struct lu_fid *lf,
557 struct md_object *mo_t,
562 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
563 "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
564 "[tname %s] [err %d]\n", fname,
565 PFID(lu_object_fid(&mo_po->mo_lu)),
566 PFID(lu_object_fid(&mo_pn->mo_lu)),
568 PFID(lu_object_fid(&mo_t->mo_lu)),
571 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
572 "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
573 "[tname %s] [err %d]\n", fname,
574 PFID(lu_object_fid(&mo_po->mo_lu)),
575 PFID(lu_object_fid(&mo_pn->mo_lu)),
580 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
581 struct md_object *mo_pn, const struct lu_fid *lf,
582 const struct lu_name *ls_name, struct md_object *mo_t,
583 const struct lu_name *lt_name, struct md_attr *ma)
585 struct cmm_thread_info *cmi;
586 struct md_attr *tmp_ma = NULL;
587 struct md_object *tmp_t = mo_t;
591 rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
595 if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
596 /* XXX: mo_t is remote object and there is RPC to unlink it.
597 * before that, do local sanity check for rename first. */
599 struct md_object *mo_s = md_object_find(env,
600 md_obj2dev(mo_po), lf);
602 RETURN(PTR_ERR(mo_s));
604 LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
605 rc = mo_permission(env, md_object_next(mo_po),
606 md_object_next(mo_s),
608 lu_object_put(env, &mo_s->mo_lu);
612 rc = mo_permission(env, NULL, md_object_next(mo_po),
613 ma, MAY_UNLINK | MAY_VTX_FULL);
618 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
619 MAY_UNLINK | MAY_VTX_PART);
624 * XXX: @ma will be changed after mo_ref_del, but we will use
625 * it for mdo_rename later, so save it before mo_ref_del.
627 cmi = cmm_env_info(env);
628 tmp_ma = &cmi->cmi_ma;
630 rc = mo_ref_del(env, md_object_next(mo_t), ma);
634 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
638 /* XXX: for src on remote MDS case, change its ctime before local
639 * rename. Firstly, do local sanity check for rename if necessary. */
642 rc = mo_permission(env, NULL, md_object_next(mo_po),
643 ma, MAY_UNLINK | MAY_VTX_FULL);
648 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
649 rc = mo_permission(env, md_object_next(mo_pn),
650 md_object_next(mo_t),
658 mask = (S_ISDIR(ma->ma_attr.la_mode) ?
659 MAY_LINK : MAY_CREATE);
662 rc = mo_permission(env, NULL,
663 md_object_next(mo_pn),
669 ma->ma_attr_flags |= MDS_PERM_BYPASS;
671 LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
674 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
675 tmp_ma ? tmp_ma : ma);
677 /* TODO: revoke mo_t if necessary. */
678 cml_rename_warn("cmm_rename_ctime", mo_po,
679 mo_pn, lf, ls_name->ln_name,
680 tmp_t, lt_name->ln_name, rc);
685 /* local rename, mo_t can be NULL */
686 rc = mdo_rename(env, md_object_next(mo_po),
687 md_object_next(mo_pn), lf, ls_name,
688 md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
690 /* TODO: revoke all cml_rename */
691 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
692 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
697 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
698 struct md_object *mo_t, const struct lu_fid *lf,
699 const struct lu_name *lname, struct md_attr *ma)
704 rc = mdo_rename_tgt(env, md_object_next(mo_p),
705 md_object_next(mo_t), lf, lname, ma);
708 /* used only in case of rename_tgt() when target is not exist */
709 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
710 const struct lu_name *lname, const struct lu_fid *lf,
711 const struct md_attr *ma)
716 rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
721 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
722 const struct lu_fid *fid, struct lu_fid *sfid)
724 struct cmm_thread_info *cmi;
728 cmi = cmm_env_info(env);
729 rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
733 if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
736 rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
740 static struct md_dir_operations cml_dir_ops = {
741 .mdo_is_subdir = cmm_is_subdir,
742 .mdo_lookup = cml_lookup,
743 .mdo_lock_mode = cml_lock_mode,
744 .mdo_create = cml_create,
745 .mdo_link = cml_link,
746 .mdo_unlink = cml_unlink,
747 .mdo_name_insert = cml_name_insert,
748 .mdo_rename = cml_rename,
749 .mdo_rename_tgt = cml_rename_tgt,
750 .mdo_create_data = cml_create_data
753 /* -------------------------------------------------------------------
754 * remote CMM object operations. cmr_...
756 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
758 return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
760 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
762 return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
764 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
766 return container_of0(co, struct cmr_object, cmm_obj);
769 /* get proper child device from MDCs */
770 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
772 struct lu_device *next = NULL;
773 struct mdc_device *mdc;
775 spin_lock(&d->cmm_tgt_guard);
776 list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
777 if (mdc->mc_num == num) {
778 next = mdc2lu_dev(mdc);
782 spin_unlock(&d->cmm_tgt_guard);
786 /* lu_object operations */
787 static void cmr_object_free(const struct lu_env *env,
788 struct lu_object *lo)
790 struct cmr_object *cro = lu2cmr_obj(lo);
795 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
797 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
798 struct lu_device *c_dev;
799 struct lu_object *c_obj;
804 c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
808 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
809 lo->lo_header, c_dev);
811 lu_object_add(lo, c_obj);
821 static int cmr_object_print(const struct lu_env *env, void *cookie,
822 lu_printer_t p, const struct lu_object *lo)
824 return (*p)(env, cookie, LUSTRE_CMM_NAME"-remote@%p", lo);
827 static struct lu_object_operations cmr_obj_ops = {
828 .loo_object_init = cmr_object_init,
829 .loo_object_free = cmr_object_free,
830 .loo_object_print = cmr_object_print
833 /* CMM remote md_object operations. All are invalid */
834 static int cmr_object_create(const struct lu_env *env,
835 struct md_object *mo,
836 const struct md_op_spec *spec,
842 static int cmr_permission(const struct lu_env *env,
843 struct md_object *p, struct md_object *c,
844 struct md_attr *attr, int mask)
849 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
850 struct md_attr *attr)
855 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
856 const struct md_attr *attr)
861 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
862 struct lu_buf *buf, const char *name)
867 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
873 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
879 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
880 const struct lu_buf *buf, const char *name, int fl)
885 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
891 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
892 const struct md_attr *ma)
897 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
903 static int cmr_open(const struct lu_env *env, struct md_object *mo,
909 static int cmr_close(const struct lu_env *env, struct md_object *mo,
915 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
916 const struct lu_rdpg *rdpg)
921 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
922 struct lustre_capa *capa, int renewal)
927 static struct md_object_operations cmr_mo_ops = {
928 .moo_permission = cmr_permission,
929 .moo_attr_get = cmr_attr_get,
930 .moo_attr_set = cmr_attr_set,
931 .moo_xattr_get = cmr_xattr_get,
932 .moo_xattr_set = cmr_xattr_set,
933 .moo_xattr_list = cmr_xattr_list,
934 .moo_xattr_del = cmr_xattr_del,
935 .moo_object_create = cmr_object_create,
936 .moo_ref_add = cmr_ref_add,
937 .moo_ref_del = cmr_ref_del,
938 .moo_open = cmr_open,
939 .moo_close = cmr_close,
940 .moo_readpage = cmr_readpage,
941 .moo_readlink = cmr_readlink,
942 .moo_capa_get = cmr_capa_get
945 /* remote part of md_dir operations */
946 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
947 const struct lu_name *lname, struct lu_fid *lf,
948 struct md_op_spec *spec)
951 * This can happens while rename() If new parent is remote dir, lookup
958 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
959 struct md_object *mo, mdl_mode_t lm)
965 * All methods below are cross-ref by nature. They consist of remote call and
966 * local operation. Due to future rollback functionality there are several
967 * limitations for such methods:
968 * 1) remote call should be done at first to do epoch negotiation between all
969 * MDS involved and to avoid the RPC inside transaction.
970 * 2) only one RPC can be sent - also due to epoch negotiation.
971 * For more details see rollback HLD/DLD.
973 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
974 const struct lu_name *lchild_name, struct md_object *mo_c,
975 struct md_op_spec *spec,
978 struct cmm_thread_info *cmi;
979 struct md_attr *tmp_ma;
983 /* Make sure that name isn't exist before doing remote call. */
984 rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
985 &cmm_env_info(env)->cmi_fid, NULL);
988 else if (rc != -ENOENT)
991 /* check the SGID attr */
992 cmi = cmm_env_info(env);
994 tmp_ma = &cmi->cmi_ma;
995 tmp_ma->ma_valid = 0;
996 tmp_ma->ma_need = MA_INODE;
998 #ifdef CONFIG_FS_POSIX_ACL
999 if (!S_ISLNK(ma->ma_attr.la_mode)) {
1000 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1001 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1002 tmp_ma->ma_need |= MA_ACL_DEF;
1005 rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1009 if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1010 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1011 if (S_ISDIR(ma->ma_attr.la_mode)) {
1012 ma->ma_attr.la_mode |= S_ISGID;
1013 ma->ma_attr.la_valid |= LA_MODE;
1017 #ifdef CONFIG_FS_POSIX_ACL
1018 if (tmp_ma->ma_valid & MA_ACL_DEF) {
1019 spec->u.sp_ea.fid = spec->u.sp_pfid;
1020 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1021 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1022 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1026 /* Local permission check for name_insert before remote ops. */
1027 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1028 (S_ISDIR(ma->ma_attr.la_mode) ?
1029 MAY_LINK : MAY_CREATE));
1033 /* Remote object creation and local name insert. */
1035 * XXX: @ma will be changed after mo_object_create, but we will use
1036 * it for mdo_name_insert later, so save it before mo_object_create.
1039 rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1041 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1042 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1043 lu_object_fid(&mo_c->mo_lu), tmp_ma);
1045 /* TODO: remove object mo_c on remote MDS */
1046 CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1047 " [name %s] [mo_c "DFID"] [err %d]\n",
1048 PFID(lu_object_fid(&mo_p->mo_lu)),
1049 lchild_name->ln_name,
1050 PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1057 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1058 struct md_object *mo_s, const struct lu_name *lname,
1064 /* Make sure that name isn't exist before doing remote call. */
1065 rc = mdo_lookup(env, md_object_next(mo_p), lname,
1066 &cmm_env_info(env)->cmi_fid, NULL);
1069 } else if (rc == -ENOENT) {
1070 /* Local permission check for name_insert before remote ops. */
1071 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1076 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1078 ma->ma_attr_flags |= MDS_PERM_BYPASS;
1079 rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1080 lu_object_fid(&mo_s->mo_lu), ma);
1082 /* TODO: ref_del from mo_s on remote MDS */
1083 CWARN("cmr_link failed, should revoke: "
1084 "[mo_p "DFID"] [mo_s "DFID"] "
1085 "[name %s] [err %d]\n",
1086 PFID(lu_object_fid(&mo_p->mo_lu)),
1087 PFID(lu_object_fid(&mo_s->mo_lu)),
1088 lname->ln_name, rc);
1095 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1096 struct md_object *mo_c, const struct lu_name *lname,
1099 struct cmm_thread_info *cmi;
1100 struct md_attr *tmp_ma;
1104 /* Local permission check for name_remove before remote ops. */
1105 rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1106 MAY_UNLINK | MAY_VTX_PART);
1111 * XXX: @ma will be changed after mo_ref_del, but we will use
1112 * it for mdo_name_remove later, so save it before mo_ref_del.
1114 cmi = cmm_env_info(env);
1115 tmp_ma = &cmi->cmi_ma;
1117 rc = mo_ref_del(env, md_object_next(mo_c), ma);
1119 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1120 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1122 /* TODO: ref_add to mo_c on remote MDS */
1123 CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1124 " [mo_c "DFID"] [name %s] [err %d]\n",
1125 PFID(lu_object_fid(&mo_p->mo_lu)),
1126 PFID(lu_object_fid(&mo_c->mo_lu)),
1127 lname->ln_name, rc);
1134 static inline void cmr_rename_warn(const char *fname,
1135 struct md_object *mo_po,
1136 struct md_object *mo_pn,
1137 const struct lu_fid *lf,
1142 CWARN("cmr_rename failed for %s, should revoke: "
1143 "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1144 "[sname %s] [tname %s] [err %d]\n", fname,
1145 PFID(lu_object_fid(&mo_po->mo_lu)),
1146 PFID(lu_object_fid(&mo_pn->mo_lu)),
1147 PFID(lf), s_name, t_name, err);
1150 static int cmr_rename(const struct lu_env *env,
1151 struct md_object *mo_po, struct md_object *mo_pn,
1152 const struct lu_fid *lf, const struct lu_name *ls_name,
1153 struct md_object *mo_t, const struct lu_name *lt_name,
1156 struct cmm_thread_info *cmi;
1157 struct md_attr *tmp_ma;
1161 LASSERT(mo_t == NULL);
1163 /* get real type of src */
1164 rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1168 /* Local permission check for name_remove before remote ops. */
1169 rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1170 MAY_UNLINK | MAY_VTX_FULL);
1175 * XXX: @ma maybe changed after mdo_rename_tgt, but we will use it
1176 * for mdo_name_remove later, so save it before mdo_rename_tgt.
1178 cmi = cmm_env_info(env);
1179 tmp_ma = &cmi->cmi_ma;
1181 /* the mo_pn is remote directory, so we cannot even know if there is
1182 * mo_t or not. Therefore mo_t is NULL here but remote server should do
1183 * lookup and process this further */
1184 rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1185 NULL/* mo_t */, lf, lt_name, ma);
1189 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1191 /* src object maybe on remote MDS, do remote ops first. */
1192 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1194 /* TODO: revoke mdo_rename_tgt */
1195 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1196 ls_name->ln_name, lt_name->ln_name, rc);
1200 /* only old name is removed localy */
1201 rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1203 /* TODO: revoke all cmr_rename */
1204 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1205 ls_name->ln_name, lt_name->ln_name, rc);
1210 /* part of cross-ref rename(). Used to insert new name in new parent
1211 * and unlink target */
1212 static int cmr_rename_tgt(const struct lu_env *env,
1213 struct md_object *mo_p, struct md_object *mo_t,
1214 const struct lu_fid *lf, const struct lu_name *lname,
1217 struct cmm_thread_info *cmi;
1218 struct md_attr *tmp_ma;
1222 /* target object is remote one */
1223 /* Local permission check for rename_tgt before remote ops. */
1224 rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1225 MAY_UNLINK | MAY_VTX_PART);
1230 * XXX: @ma maybe changed after mo_ref_del, but we will use
1231 * it for mdo_rename_tgt later, so save it before mo_ref_del.
1233 cmi = cmm_env_info(env);
1234 tmp_ma = &cmi->cmi_ma;
1236 rc = mo_ref_del(env, md_object_next(mo_t), ma);
1237 /* continue locally with name handling only */
1239 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1240 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1241 NULL, lf, lname, tmp_ma);
1243 /* TODO: ref_add to mo_t on remote MDS */
1244 CWARN("cmr_rename_tgt failed, should revoke: "
1245 "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1246 "[name %s] [err %d]\n",
1247 PFID(lu_object_fid(&mo_p->mo_lu)),
1248 PFID(lu_object_fid(&mo_t->mo_lu)),
1250 lname->ln_name, rc);
1256 static struct md_dir_operations cmr_dir_ops = {
1257 .mdo_is_subdir = cmm_is_subdir,
1258 .mdo_lookup = cmr_lookup,
1259 .mdo_lock_mode = cmr_lock_mode,
1260 .mdo_create = cmr_create,
1261 .mdo_link = cmr_link,
1262 .mdo_unlink = cmr_unlink,
1263 .mdo_rename = cmr_rename,
1264 .mdo_rename_tgt = cmr_rename_tgt,