1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/cmm/cmm_object.c
38 * Lustre Cluster Metadata Manager (cmm)
40 * Author: Mike Pershin <tappro@clusterfs.com>
44 # define EXPORT_SYMTAB
47 #define DEBUG_SUBSYSTEM S_MDS
49 #include <lustre_fid.h>
50 #include "cmm_internal.h"
51 #include "mdc_internal.h"
53 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
54 mdsno_t *mds, const struct lu_env *env)
59 LASSERT(fid_is_sane(fid));
61 rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
63 CERROR("Can't find mds by seq "LPX64", rc %d\n",
68 if (*mds > cm->cmm_tgt_count) {
69 CERROR("Got invalid mdsno: %x (max: %x)\n",
70 *mds, cm->cmm_tgt_count);
73 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
74 LPX64"\n", *mds, fid_seq(fid));
80 static const struct md_object_operations cml_mo_ops;
81 static const struct md_dir_operations cml_dir_ops;
82 static const struct lu_object_operations cml_obj_ops;
84 static const struct md_object_operations cmr_mo_ops;
85 static const struct md_dir_operations cmr_dir_ops;
86 static const struct lu_object_operations cmr_obj_ops;
88 struct lu_object *cmm_object_alloc(const struct lu_env *env,
89 const struct lu_object_header *loh,
92 const struct lu_fid *fid = &loh->loh_fid;
93 struct lu_object *lo = NULL;
94 struct cmm_device *cd;
101 if (cd->cmm_flags & CMM_INITIALIZED) {
102 /* get object location */
103 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
108 * Device is not yet initialized, cmm_object is being created
109 * as part of early bootstrap procedure (it is /ROOT, or /fld,
110 * etc.). Such object *has* to be local.
112 mds = cd->cmm_local_num;
114 /* select the proper set of operations based on object location */
115 if (mds == cd->cmm_local_num) {
116 struct cml_object *clo;
120 lo = &clo->cmm_obj.cmo_obj.mo_lu;
121 lu_object_init(lo, NULL, ld);
122 clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
123 clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
124 lo->lo_ops = &cml_obj_ops;
127 struct cmr_object *cro;
131 lo = &cro->cmm_obj.cmo_obj.mo_lu;
132 lu_object_init(lo, NULL, ld);
133 cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
134 cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
135 lo->lo_ops = &cmr_obj_ops;
143 * CMM has two types of objects - local and remote. They have different set
144 * of operations so we are avoiding multiple checks in code.
147 /* get local child device */
148 static struct lu_device *cml_child_dev(struct cmm_device *d)
150 return &d->cmm_child->md_lu_dev;
153 /* lu_object operations */
154 static void cml_object_free(const struct lu_env *env,
155 struct lu_object *lo)
157 struct cml_object *clo = lu2cml_obj(lo);
162 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
163 const struct lu_object_conf *unused)
165 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
166 struct lu_device *c_dev;
167 struct lu_object *c_obj;
172 #ifdef HAVE_SPLIT_SUPPORT
173 if (cd->cmm_tgt_count == 0)
174 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
176 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
178 c_dev = cml_child_dev(cd);
182 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
183 lo->lo_header, c_dev);
185 lu_object_add(lo, c_obj);
195 static int cml_object_print(const struct lu_env *env, void *cookie,
196 lu_printer_t p, const struct lu_object *lo)
198 return (*p)(env, cookie, "[local]");
201 static const struct lu_object_operations cml_obj_ops = {
202 .loo_object_init = cml_object_init,
203 .loo_object_free = cml_object_free,
204 .loo_object_print = cml_object_print
207 /* CMM local md_object operations */
208 static int cml_object_create(const struct lu_env *env,
209 struct md_object *mo,
210 const struct md_op_spec *spec,
211 struct md_attr *attr)
215 rc = mo_object_create(env, md_object_next(mo), spec, attr);
219 static int cml_permission(const struct lu_env *env,
220 struct md_object *p, struct md_object *c,
221 struct md_attr *attr, int mask)
225 rc = mo_permission(env, md_object_next(p), md_object_next(c),
230 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
231 struct md_attr *attr)
235 rc = mo_attr_get(env, md_object_next(mo), attr);
239 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
240 const struct md_attr *attr)
244 rc = mo_attr_set(env, md_object_next(mo), attr);
248 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
249 struct lu_buf *buf, const char *name)
253 rc = mo_xattr_get(env, md_object_next(mo), buf, name);
257 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
262 rc = mo_readlink(env, md_object_next(mo), buf);
266 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
271 rc = mo_xattr_list(env, md_object_next(mo), buf);
275 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
276 const struct lu_buf *buf, const char *name,
281 rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
285 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
290 rc = mo_xattr_del(env, md_object_next(mo), name);
294 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
295 const struct md_attr *ma)
299 rc = mo_ref_add(env, md_object_next(mo), ma);
303 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
308 rc = mo_ref_del(env, md_object_next(mo), ma);
312 static int cml_open(const struct lu_env *env, struct md_object *mo,
317 rc = mo_open(env, md_object_next(mo), flags);
321 static int cml_close(const struct lu_env *env, struct md_object *mo,
326 rc = mo_close(env, md_object_next(mo), ma);
330 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
331 const struct lu_rdpg *rdpg)
335 rc = mo_readpage(env, md_object_next(mo), rdpg);
339 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
340 struct lustre_capa *capa, int renewal)
344 rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
348 static int cml_path(const struct lu_env *env, struct md_object *mo,
349 char *path, int pathlen, __u64 *recno, int *linkno)
353 rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
357 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
361 rc = mo_object_sync(env, md_object_next(mo));
365 static dt_obj_version_t cml_version_get(const struct lu_env *env,
366 struct md_object *mo)
368 return mo_version_get(env, md_object_next(mo));
371 static void cml_version_set(const struct lu_env *env, struct md_object *mo,
372 dt_obj_version_t version)
374 return mo_version_set(env, md_object_next(mo), version);
377 static const struct md_object_operations cml_mo_ops = {
378 .moo_permission = cml_permission,
379 .moo_attr_get = cml_attr_get,
380 .moo_attr_set = cml_attr_set,
381 .moo_xattr_get = cml_xattr_get,
382 .moo_xattr_list = cml_xattr_list,
383 .moo_xattr_set = cml_xattr_set,
384 .moo_xattr_del = cml_xattr_del,
385 .moo_object_create = cml_object_create,
386 .moo_ref_add = cml_ref_add,
387 .moo_ref_del = cml_ref_del,
388 .moo_open = cml_open,
389 .moo_close = cml_close,
390 .moo_readpage = cml_readpage,
391 .moo_readlink = cml_readlink,
392 .moo_capa_get = cml_capa_get,
393 .moo_object_sync = cml_object_sync,
394 .moo_version_get = cml_version_get,
395 .moo_version_set = cml_version_set,
396 .moo_path = cml_path,
399 /* md_dir operations */
400 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
401 const struct lu_name *lname, struct lu_fid *lf,
402 struct md_op_spec *spec)
407 #ifdef HAVE_SPLIT_SUPPORT
408 if (spec != NULL && spec->sp_ck_split) {
409 rc = cmm_split_check(env, mo_p, lname->ln_name);
414 rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
419 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
420 struct md_object *mo, mdl_mode_t lm)
422 int rc = MDL_MINMODE;
425 #ifdef HAVE_SPLIT_SUPPORT
426 rc = cmm_split_access(env, mo, lm);
432 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
433 const struct lu_name *lname, struct md_object *mo_c,
434 struct md_op_spec *spec, struct md_attr *ma)
439 #ifdef HAVE_SPLIT_SUPPORT
440 /* Lock mode always should be sane. */
441 LASSERT(spec->sp_cr_mode != MDL_MINMODE);
444 * Sigh... This is long story. MDT may have race with detecting if split
445 * is possible in cmm. We know this race and let it live, because
446 * getting it rid (with some sem or spinlock) will also mean that
447 * PDIROPS for create will not work because we kill parallel work, what
448 * is really bad for performance and makes no sense having PDIROPS. So,
449 * we better allow the race to live, but split dir only if some of
450 * concurrent threads takes EX lock, not matter which one. So that, say,
451 * two concurrent threads may have different lock modes on directory (CW
452 * and EX) and not first one which comes here and see that split is
453 * possible should split the dir, but only that one which has EX
454 * lock. And we do not care that in this case, split may happen a bit
455 * later (when dir size will not be necessarily 64K, but may be a bit
456 * larger). So that, we allow concurrent creates and protect split by EX
459 if (spec->sp_cr_mode == MDL_EX) {
461 * Try to split @mo_p. If split is ok, -ERESTART is returned and
462 * current thread will not peoceed with create. Instead it sends
463 * -ERESTART to client to let it know that correct MDT should be
466 rc = cmm_split_dir(env, mo_p);
469 * -ERESTART or some split error is returned, we can't
470 * proceed with create.
475 if (spec != NULL && spec->sp_ck_split) {
477 * Check for possible split directory and let caller know that
478 * it should tell client that directory is split and operation
479 * should repeat to correct MDT.
481 rc = cmm_split_check(env, mo_p, lname->ln_name);
487 rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
491 #ifdef HAVE_SPLIT_SUPPORT
497 static int cml_create_data(const struct lu_env *env, struct md_object *p,
499 const struct md_op_spec *spec,
504 rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
509 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
510 struct md_object *mo_s, const struct lu_name *lname,
515 rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
520 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
521 struct md_object *mo_c, const struct lu_name *lname,
526 rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
531 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
532 const struct lu_fid *lf, struct md_attr *ma,
535 struct md_object *mo_s = md_object_find_slice(env, md, lf);
536 struct cmm_thread_info *cmi;
537 struct md_attr *tmp_ma;
542 RETURN(PTR_ERR(mo_s));
544 if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
547 cmi = cmm_env_info(env);
548 tmp_ma = &cmi->cmi_ma;
549 tmp_ma->ma_need = MA_INODE;
550 tmp_ma->ma_valid = 0;
551 /* get type from src, can be remote req */
552 rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
554 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
555 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
556 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
557 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
558 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
560 lu_object_put(env, &mo_s->mo_lu);
564 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
565 const struct lu_fid *lf, struct md_attr *ma)
567 struct md_object *mo_s = md_object_find_slice(env, md, lf);
572 RETURN(PTR_ERR(mo_s));
574 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
575 /* set ctime to obj, can be remote req */
576 rc = mo_attr_set(env, md_object_next(mo_s), ma);
577 lu_object_put(env, &mo_s->mo_lu);
581 static inline void cml_rename_warn(const char *fname,
582 struct md_object *mo_po,
583 struct md_object *mo_pn,
584 const struct lu_fid *lf,
586 struct md_object *mo_t,
591 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
592 "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
593 "[tname %s] [err %d]\n", fname,
594 PFID(lu_object_fid(&mo_po->mo_lu)),
595 PFID(lu_object_fid(&mo_pn->mo_lu)),
597 PFID(lu_object_fid(&mo_t->mo_lu)),
600 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
601 "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
602 "[tname %s] [err %d]\n", fname,
603 PFID(lu_object_fid(&mo_po->mo_lu)),
604 PFID(lu_object_fid(&mo_pn->mo_lu)),
609 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
610 struct md_object *mo_pn, const struct lu_fid *lf,
611 const struct lu_name *ls_name, struct md_object *mo_t,
612 const struct lu_name *lt_name, struct md_attr *ma)
614 struct cmm_thread_info *cmi;
615 struct md_attr *tmp_ma = NULL;
616 struct md_object *tmp_t = mo_t;
620 rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
624 if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
625 /* XXX: mo_t is remote object and there is RPC to unlink it.
626 * before that, do local sanity check for rename first. */
628 struct md_object *mo_s = md_object_find_slice(env,
629 md_obj2dev(mo_po), lf);
631 RETURN(PTR_ERR(mo_s));
633 LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
634 rc = mo_permission(env, md_object_next(mo_po),
635 md_object_next(mo_s),
637 lu_object_put(env, &mo_s->mo_lu);
641 rc = mo_permission(env, NULL, md_object_next(mo_po),
642 ma, MAY_UNLINK | MAY_VTX_FULL);
647 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
648 MAY_UNLINK | MAY_VTX_PART);
653 * XXX: @ma will be changed after mo_ref_del, but we will use
654 * it for mdo_rename later, so save it before mo_ref_del.
656 cmi = cmm_env_info(env);
657 tmp_ma = &cmi->cmi_ma;
659 rc = mo_ref_del(env, md_object_next(mo_t), ma);
663 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
667 /* XXX: for src on remote MDS case, change its ctime before local
668 * rename. Firstly, do local sanity check for rename if necessary. */
671 rc = mo_permission(env, NULL, md_object_next(mo_po),
672 ma, MAY_UNLINK | MAY_VTX_FULL);
677 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
678 rc = mo_permission(env, md_object_next(mo_pn),
679 md_object_next(mo_t),
687 mask = (S_ISDIR(ma->ma_attr.la_mode) ?
688 MAY_LINK : MAY_CREATE);
691 rc = mo_permission(env, NULL,
692 md_object_next(mo_pn),
698 ma->ma_attr_flags |= MDS_PERM_BYPASS;
700 LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
703 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
704 tmp_ma ? tmp_ma : ma);
706 /* TODO: revoke mo_t if necessary. */
707 cml_rename_warn("cmm_rename_ctime", mo_po,
708 mo_pn, lf, ls_name->ln_name,
709 tmp_t, lt_name->ln_name, rc);
714 /* local rename, mo_t can be NULL */
715 rc = mdo_rename(env, md_object_next(mo_po),
716 md_object_next(mo_pn), lf, ls_name,
717 md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
719 /* TODO: revoke all cml_rename */
720 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
721 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
726 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
727 struct md_object *mo_t, const struct lu_fid *lf,
728 const struct lu_name *lname, struct md_attr *ma)
733 rc = mdo_rename_tgt(env, md_object_next(mo_p),
734 md_object_next(mo_t), lf, lname, ma);
737 /* used only in case of rename_tgt() when target is not exist */
738 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
739 const struct lu_name *lname, const struct lu_fid *lf,
740 const struct md_attr *ma)
745 rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
750 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
751 const struct lu_fid *fid, struct lu_fid *sfid)
753 struct cmm_thread_info *cmi;
757 cmi = cmm_env_info(env);
758 rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
762 if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
765 rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
769 static const struct md_dir_operations cml_dir_ops = {
770 .mdo_is_subdir = cmm_is_subdir,
771 .mdo_lookup = cml_lookup,
772 .mdo_lock_mode = cml_lock_mode,
773 .mdo_create = cml_create,
774 .mdo_link = cml_link,
775 .mdo_unlink = cml_unlink,
776 .mdo_name_insert = cml_name_insert,
777 .mdo_rename = cml_rename,
778 .mdo_rename_tgt = cml_rename_tgt,
779 .mdo_create_data = cml_create_data
782 /* -------------------------------------------------------------------
783 * remote CMM object operations. cmr_...
785 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
787 return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
789 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
791 return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
793 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
795 return container_of0(co, struct cmr_object, cmm_obj);
798 /* get proper child device from MDCs */
799 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
801 struct lu_device *next = NULL;
802 struct mdc_device *mdc;
804 cfs_spin_lock(&d->cmm_tgt_guard);
805 cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
806 if (mdc->mc_num == num) {
807 next = mdc2lu_dev(mdc);
811 cfs_spin_unlock(&d->cmm_tgt_guard);
815 /* lu_object operations */
816 static void cmr_object_free(const struct lu_env *env,
817 struct lu_object *lo)
819 struct cmr_object *cro = lu2cmr_obj(lo);
824 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
825 const struct lu_object_conf *unused)
827 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
828 struct lu_device *c_dev;
829 struct lu_object *c_obj;
834 c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
838 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
839 lo->lo_header, c_dev);
841 lu_object_add(lo, c_obj);
851 static int cmr_object_print(const struct lu_env *env, void *cookie,
852 lu_printer_t p, const struct lu_object *lo)
854 const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
855 return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
858 static const struct lu_object_operations cmr_obj_ops = {
859 .loo_object_init = cmr_object_init,
860 .loo_object_free = cmr_object_free,
861 .loo_object_print = cmr_object_print
864 /* CMM remote md_object operations. All are invalid */
865 static int cmr_object_create(const struct lu_env *env,
866 struct md_object *mo,
867 const struct md_op_spec *spec,
873 static int cmr_permission(const struct lu_env *env,
874 struct md_object *p, struct md_object *c,
875 struct md_attr *attr, int mask)
880 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
881 struct md_attr *attr)
886 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
887 const struct md_attr *attr)
892 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
893 struct lu_buf *buf, const char *name)
898 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
904 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
910 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
911 const struct lu_buf *buf, const char *name,
917 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
923 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
924 const struct md_attr *ma)
929 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
935 static int cmr_open(const struct lu_env *env, struct md_object *mo,
941 static int cmr_close(const struct lu_env *env, struct md_object *mo,
947 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
948 const struct lu_rdpg *rdpg)
953 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
954 struct lustre_capa *capa, int renewal)
959 static int cmr_path(const struct lu_env *env, struct md_object *obj,
960 char *path, int pathlen, __u64 *recno, int *linkno)
965 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
970 static dt_obj_version_t cmr_version_get(const struct lu_env *env,
971 struct md_object *mo)
973 /* Don't check remote object version */
977 static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
978 dt_obj_version_t version)
980 /* No need to update remote object version here, it is done as a part
981 * of reintegration of partial operation on the remote server */
984 static const struct md_object_operations cmr_mo_ops = {
985 .moo_permission = cmr_permission,
986 .moo_attr_get = cmr_attr_get,
987 .moo_attr_set = cmr_attr_set,
988 .moo_xattr_get = cmr_xattr_get,
989 .moo_xattr_set = cmr_xattr_set,
990 .moo_xattr_list = cmr_xattr_list,
991 .moo_xattr_del = cmr_xattr_del,
992 .moo_object_create = cmr_object_create,
993 .moo_ref_add = cmr_ref_add,
994 .moo_ref_del = cmr_ref_del,
995 .moo_open = cmr_open,
996 .moo_close = cmr_close,
997 .moo_readpage = cmr_readpage,
998 .moo_readlink = cmr_readlink,
999 .moo_capa_get = cmr_capa_get,
1000 .moo_object_sync = cmr_object_sync,
1001 .moo_version_get = cmr_version_get,
1002 .moo_version_set = cmr_version_set,
1003 .moo_path = cmr_path,
1006 /* remote part of md_dir operations */
1007 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1008 const struct lu_name *lname, struct lu_fid *lf,
1009 struct md_op_spec *spec)
1012 * This can happens while rename() If new parent is remote dir, lookup
1019 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1020 struct md_object *mo, mdl_mode_t lm)
1026 * All methods below are cross-ref by nature. They consist of remote call and
1027 * local operation. Due to future rollback functionality there are several
1028 * limitations for such methods:
1029 * 1) remote call should be done at first to do epoch negotiation between all
1030 * MDS involved and to avoid the RPC inside transaction.
1031 * 2) only one RPC can be sent - also due to epoch negotiation.
1032 * For more details see rollback HLD/DLD.
1034 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1035 const struct lu_name *lchild_name, struct md_object *mo_c,
1036 struct md_op_spec *spec,
1039 struct cmm_thread_info *cmi;
1040 struct md_attr *tmp_ma;
1044 /* Make sure that name isn't exist before doing remote call. */
1045 rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1046 &cmm_env_info(env)->cmi_fid, NULL);
1049 else if (rc != -ENOENT)
1052 /* check the SGID attr */
1053 cmi = cmm_env_info(env);
1055 tmp_ma = &cmi->cmi_ma;
1056 tmp_ma->ma_valid = 0;
1057 tmp_ma->ma_need = MA_INODE;
1059 #ifdef CONFIG_FS_POSIX_ACL
1060 if (!S_ISLNK(ma->ma_attr.la_mode)) {
1061 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1062 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1063 tmp_ma->ma_need |= MA_ACL_DEF;
1066 rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1070 if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1071 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1072 if (S_ISDIR(ma->ma_attr.la_mode)) {
1073 ma->ma_attr.la_mode |= S_ISGID;
1074 ma->ma_attr.la_valid |= LA_MODE;
1078 #ifdef CONFIG_FS_POSIX_ACL
1079 if (tmp_ma->ma_valid & MA_ACL_DEF) {
1080 spec->u.sp_ea.fid = spec->u.sp_pfid;
1081 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1082 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1083 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1087 /* Local permission check for name_insert before remote ops. */
1088 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1089 (S_ISDIR(ma->ma_attr.la_mode) ?
1090 MAY_LINK : MAY_CREATE));
1094 /* Remote object creation and local name insert. */
1096 * XXX: @ma will be changed after mo_object_create, but we will use
1097 * it for mdo_name_insert later, so save it before mo_object_create.
1100 rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1102 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1103 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1104 lu_object_fid(&mo_c->mo_lu), tmp_ma);
1106 /* TODO: remove object mo_c on remote MDS */
1107 CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1108 " [name %s] [mo_c "DFID"] [err %d]\n",
1109 PFID(lu_object_fid(&mo_p->mo_lu)),
1110 lchild_name->ln_name,
1111 PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1118 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1119 struct md_object *mo_s, const struct lu_name *lname,
1125 /* Make sure that name isn't exist before doing remote call. */
1126 rc = mdo_lookup(env, md_object_next(mo_p), lname,
1127 &cmm_env_info(env)->cmi_fid, NULL);
1130 } else if (rc == -ENOENT) {
1131 /* Local permission check for name_insert before remote ops. */
1132 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1137 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1139 ma->ma_attr_flags |= MDS_PERM_BYPASS;
1140 rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1141 lu_object_fid(&mo_s->mo_lu), ma);
1143 /* TODO: ref_del from mo_s on remote MDS */
1144 CWARN("cmr_link failed, should revoke: "
1145 "[mo_p "DFID"] [mo_s "DFID"] "
1146 "[name %s] [err %d]\n",
1147 PFID(lu_object_fid(&mo_p->mo_lu)),
1148 PFID(lu_object_fid(&mo_s->mo_lu)),
1149 lname->ln_name, rc);
1156 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1157 struct md_object *mo_c, const struct lu_name *lname,
1160 struct cmm_thread_info *cmi;
1161 struct md_attr *tmp_ma;
1165 /* Local permission check for name_remove before remote ops. */
1166 rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1167 MAY_UNLINK | MAY_VTX_PART);
1172 * XXX: @ma will be changed after mo_ref_del, but we will use
1173 * it for mdo_name_remove later, so save it before mo_ref_del.
1175 cmi = cmm_env_info(env);
1176 tmp_ma = &cmi->cmi_ma;
1178 rc = mo_ref_del(env, md_object_next(mo_c), ma);
1180 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1181 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1183 /* TODO: ref_add to mo_c on remote MDS */
1184 CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1185 " [mo_c "DFID"] [name %s] [err %d]\n",
1186 PFID(lu_object_fid(&mo_p->mo_lu)),
1187 PFID(lu_object_fid(&mo_c->mo_lu)),
1188 lname->ln_name, rc);
1195 static inline void cmr_rename_warn(const char *fname,
1196 struct md_object *mo_po,
1197 struct md_object *mo_pn,
1198 const struct lu_fid *lf,
1203 CWARN("cmr_rename failed for %s, should revoke: "
1204 "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1205 "[sname %s] [tname %s] [err %d]\n", fname,
1206 PFID(lu_object_fid(&mo_po->mo_lu)),
1207 PFID(lu_object_fid(&mo_pn->mo_lu)),
1208 PFID(lf), s_name, t_name, err);
1211 static int cmr_rename(const struct lu_env *env,
1212 struct md_object *mo_po, struct md_object *mo_pn,
1213 const struct lu_fid *lf, const struct lu_name *ls_name,
1214 struct md_object *mo_t, const struct lu_name *lt_name,
1217 struct cmm_thread_info *cmi;
1218 struct md_attr *tmp_ma;
1222 LASSERT(mo_t == NULL);
1224 /* get real type of src */
1225 rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1229 /* Local permission check for name_remove before remote ops. */
1230 rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1231 MAY_UNLINK | MAY_VTX_FULL);
1236 * XXX: @ma maybe changed after mdo_rename_tgt, but we will use it
1237 * for mdo_name_remove later, so save it before mdo_rename_tgt.
1239 cmi = cmm_env_info(env);
1240 tmp_ma = &cmi->cmi_ma;
1242 /* the mo_pn is remote directory, so we cannot even know if there is
1243 * mo_t or not. Therefore mo_t is NULL here but remote server should do
1244 * lookup and process this further */
1245 rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1246 NULL/* mo_t */, lf, lt_name, ma);
1250 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1252 /* src object maybe on remote MDS, do remote ops first. */
1253 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1255 /* TODO: revoke mdo_rename_tgt */
1256 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1257 ls_name->ln_name, lt_name->ln_name, rc);
1261 /* only old name is removed localy */
1262 rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1264 /* TODO: revoke all cmr_rename */
1265 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1266 ls_name->ln_name, lt_name->ln_name, rc);
1271 /* part of cross-ref rename(). Used to insert new name in new parent
1272 * and unlink target */
1273 static int cmr_rename_tgt(const struct lu_env *env,
1274 struct md_object *mo_p, struct md_object *mo_t,
1275 const struct lu_fid *lf, const struct lu_name *lname,
1278 struct cmm_thread_info *cmi;
1279 struct md_attr *tmp_ma;
1283 /* target object is remote one */
1284 /* Local permission check for rename_tgt before remote ops. */
1285 rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1286 MAY_UNLINK | MAY_VTX_PART);
1291 * XXX: @ma maybe changed after mo_ref_del, but we will use
1292 * it for mdo_rename_tgt later, so save it before mo_ref_del.
1294 cmi = cmm_env_info(env);
1295 tmp_ma = &cmi->cmi_ma;
1297 rc = mo_ref_del(env, md_object_next(mo_t), ma);
1298 /* continue locally with name handling only */
1300 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1301 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1302 NULL, lf, lname, tmp_ma);
1304 /* TODO: ref_add to mo_t on remote MDS */
1305 CWARN("cmr_rename_tgt failed, should revoke: "
1306 "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1307 "[name %s] [err %d]\n",
1308 PFID(lu_object_fid(&mo_p->mo_lu)),
1309 PFID(lu_object_fid(&mo_t->mo_lu)),
1311 lname->ln_name, rc);
1317 static const struct md_dir_operations cmr_dir_ops = {
1318 .mdo_is_subdir = cmm_is_subdir,
1319 .mdo_lookup = cmr_lookup,
1320 .mdo_lock_mode = cmr_lock_mode,
1321 .mdo_create = cmr_create,
1322 .mdo_link = cmr_link,
1323 .mdo_unlink = cmr_unlink,
1324 .mdo_rename = cmr_rename,
1325 .mdo_rename_tgt = cmr_rename_tgt