4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/cmm/cmm_object.c
38 * Lustre Cluster Metadata Manager (cmm)
40 * Author: Mike Pershin <tappro@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <lustre_fid.h>
46 #include "cmm_internal.h"
47 #include "mdc_internal.h"
50 * Lookup MDS number \a mds by FID \a fid.
52 * \param fid FID of object to find MDS
53 * \param mds mds number to return.
55 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
56 mdsno_t *mds, const struct lu_env *env)
61 LASSERT(fid_is_sane(fid));
63 rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
64 LU_SEQ_RANGE_MDT, env);
66 CERROR("Can't find mds by seq "LPX64", rc %d\n",
71 if (*mds > cm->cmm_tgt_count) {
72 CERROR("Got invalid mdsno: %x (max: %x)\n",
73 *mds, cm->cmm_tgt_count);
76 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
77 LPX64"\n", *mds, fid_seq(fid));
87 static const struct md_object_operations cml_mo_ops;
88 static const struct md_dir_operations cml_dir_ops;
89 static const struct lu_object_operations cml_obj_ops;
91 static const struct md_object_operations cmr_mo_ops;
92 static const struct md_dir_operations cmr_dir_ops;
93 static const struct lu_object_operations cmr_obj_ops;
97 * Allocate CMM object.
99 struct lu_object *cmm_object_alloc(const struct lu_env *env,
100 const struct lu_object_header *loh,
101 struct lu_device *ld)
103 const struct lu_fid *fid = &loh->loh_fid;
104 struct lu_object *lo = NULL;
105 struct cmm_device *cd;
112 if (cd->cmm_flags & CMM_INITIALIZED) {
113 /* get object location */
114 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
119 * Device is not yet initialized, cmm_object is being created
120 * as part of early bootstrap procedure (it is /ROOT, or /fld,
121 * etc.). Such object *has* to be local.
123 mds = cd->cmm_local_num;
125 /* select the proper set of operations based on object location */
126 if (mds == cd->cmm_local_num) {
127 struct cml_object *clo;
131 lo = &clo->cmm_obj.cmo_obj.mo_lu;
132 lu_object_init(lo, NULL, ld);
133 clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
134 clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
135 lo->lo_ops = &cml_obj_ops;
138 struct cmr_object *cro;
142 lo = &cro->cmm_obj.cmo_obj.mo_lu;
143 lu_object_init(lo, NULL, ld);
144 cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
145 cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
146 lo->lo_ops = &cmr_obj_ops;
154 * Get local child device.
156 static struct lu_device *cml_child_dev(struct cmm_device *d)
158 return &d->cmm_child->md_lu_dev;
164 static void cml_object_free(const struct lu_env *env,
165 struct lu_object *lo)
167 struct cml_object *clo = lu2cml_obj(lo);
173 * Initialize cml_object.
175 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
176 const struct lu_object_conf *unused)
178 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
179 struct lu_device *c_dev;
180 struct lu_object *c_obj;
185 #ifdef HAVE_SPLIT_SUPPORT
186 if (cd->cmm_tgt_count == 0)
187 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
189 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
191 c_dev = cml_child_dev(cd);
195 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
196 lo->lo_header, c_dev);
198 lu_object_add(lo, c_obj);
208 static int cml_object_print(const struct lu_env *env, void *cookie,
209 lu_printer_t p, const struct lu_object *lo)
211 return (*p)(env, cookie, "[local]");
214 static const struct lu_object_operations cml_obj_ops = {
215 .loo_object_init = cml_object_init,
216 .loo_object_free = cml_object_free,
217 .loo_object_print = cml_object_print
221 * \name CMM local md_object operations.
222 * All of them call just corresponding operations on next layer.
225 static int cml_object_create(const struct lu_env *env,
226 struct md_object *mo,
227 const struct md_op_spec *spec,
228 struct md_attr *attr)
232 rc = mo_object_create(env, md_object_next(mo), spec, attr);
236 static int cml_permission(const struct lu_env *env,
237 struct md_object *p, struct md_object *c,
238 struct md_attr *attr, int mask)
242 rc = mo_permission(env, md_object_next(p), md_object_next(c),
247 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
248 struct md_attr *attr)
252 rc = mo_attr_get(env, md_object_next(mo), attr);
256 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
257 const struct md_attr *attr)
261 rc = mo_attr_set(env, md_object_next(mo), attr);
265 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
266 struct lu_buf *buf, const char *name)
270 rc = mo_xattr_get(env, md_object_next(mo), buf, name);
274 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
279 rc = mo_readlink(env, md_object_next(mo), buf);
283 static int cml_changelog(const struct lu_env *env, enum changelog_rec_type type,
284 int flags, struct md_object *mo)
288 rc = mo_changelog(env, type, flags, md_object_next(mo));
292 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
297 rc = mo_xattr_list(env, md_object_next(mo), buf);
301 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
302 const struct lu_buf *buf, const char *name,
307 rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
311 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
316 rc = mo_xattr_del(env, md_object_next(mo), name);
320 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
321 const struct md_attr *ma)
325 rc = mo_ref_add(env, md_object_next(mo), ma);
329 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
334 rc = mo_ref_del(env, md_object_next(mo), ma);
338 static int cml_open(const struct lu_env *env, struct md_object *mo,
343 rc = mo_open(env, md_object_next(mo), flags);
347 static int cml_close(const struct lu_env *env, struct md_object *mo,
348 struct md_attr *ma, int mode)
352 rc = mo_close(env, md_object_next(mo), ma, mode);
356 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
357 const struct lu_rdpg *rdpg)
361 rc = mo_readpage(env, md_object_next(mo), rdpg);
365 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
366 struct lustre_capa *capa, int renewal)
370 rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
374 static int cml_path(const struct lu_env *env, struct md_object *mo,
375 char *path, int pathlen, __u64 *recno, int *linkno)
379 rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
383 static int cml_file_lock(const struct lu_env *env, struct md_object *mo,
384 struct lov_mds_md *lmm, struct ldlm_extent *extent,
385 struct lustre_handle *lockh)
389 rc = mo_file_lock(env, md_object_next(mo), lmm, extent, lockh);
393 static int cml_file_unlock(const struct lu_env *env, struct md_object *mo,
394 struct lov_mds_md *lmm, struct lustre_handle *lockh)
398 rc = mo_file_unlock(env, md_object_next(mo), lmm, lockh);
402 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
406 rc = mo_object_sync(env, md_object_next(mo));
410 static const struct md_object_operations cml_mo_ops = {
411 .moo_permission = cml_permission,
412 .moo_attr_get = cml_attr_get,
413 .moo_attr_set = cml_attr_set,
414 .moo_xattr_get = cml_xattr_get,
415 .moo_xattr_list = cml_xattr_list,
416 .moo_xattr_set = cml_xattr_set,
417 .moo_xattr_del = cml_xattr_del,
418 .moo_object_create = cml_object_create,
419 .moo_ref_add = cml_ref_add,
420 .moo_ref_del = cml_ref_del,
421 .moo_open = cml_open,
422 .moo_close = cml_close,
423 .moo_readpage = cml_readpage,
424 .moo_readlink = cml_readlink,
425 .moo_changelog = cml_changelog,
426 .moo_capa_get = cml_capa_get,
427 .moo_object_sync = cml_object_sync,
428 .moo_path = cml_path,
429 .moo_file_lock = cml_file_lock,
430 .moo_file_unlock = cml_file_unlock,
435 * \name CMM local md_dir_operations.
439 * cml lookup object fid by name.
440 * This returns only FID by name.
442 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
443 const struct lu_name *lname, struct lu_fid *lf,
444 struct md_op_spec *spec)
449 #ifdef HAVE_SPLIT_SUPPORT
450 if (spec != NULL && spec->sp_ck_split) {
451 rc = cmm_split_check(env, mo_p, lname->ln_name);
456 rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
462 * Helper to return lock mode. Used in split cases only.
464 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
465 struct md_object *mo, mdl_mode_t lm)
467 int rc = MDL_MINMODE;
470 #ifdef HAVE_SPLIT_SUPPORT
471 rc = cmm_split_access(env, mo, lm);
478 * Create operation for cml.
479 * Objects are local, but split can happen.
480 * If split is not needed this will call next layer mdo_create().
482 * \param mo_p Parent directory. Local object.
483 * \param lname name of file to create.
484 * \param mo_c Child object. It has no real inode yet.
485 * \param spec creation specification.
486 * \param ma child object attributes.
488 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
489 const struct lu_name *lname, struct md_object *mo_c,
490 struct md_op_spec *spec, struct md_attr *ma)
495 #ifdef HAVE_SPLIT_SUPPORT
496 /* Lock mode always should be sane. */
497 LASSERT(spec->sp_cr_mode != MDL_MINMODE);
500 * Sigh... This is long story. MDT may have race with detecting if split
501 * is possible in cmm. We know this race and let it live, because
502 * getting it rid (with some sem or spinlock) will also mean that
503 * PDIROPS for create will not work because we kill parallel work, what
504 * is really bad for performance and makes no sense having PDIROPS. So,
505 * we better allow the race to live, but split dir only if some of
506 * concurrent threads takes EX lock, not matter which one. So that, say,
507 * two concurrent threads may have different lock modes on directory (CW
508 * and EX) and not first one which comes here and see that split is
509 * possible should split the dir, but only that one which has EX
510 * lock. And we do not care that in this case, split may happen a bit
511 * later (when dir size will not be necessarily 64K, but may be a bit
512 * larger). So that, we allow concurrent creates and protect split by EX
515 if (spec->sp_cr_mode == MDL_EX) {
518 * - Try to split \a mo_p upon each create operation.
519 * If split is ok, -ERESTART is returned and current thread
520 * will not peoceed with create. Instead it sends -ERESTART
521 * to client to let it know that correct MDT must be chosen.
522 * \see cmm_split_dir()
524 rc = cmm_split_dir(env, mo_p);
527 * -ERESTART or some split error is returned, we can't
528 * proceed with create.
533 if (spec != NULL && spec->sp_ck_split) {
535 * - Directory is split already. Let the caller know that
536 * it should tell client that directory is split and operation
537 * should repeat to correct MDT.
538 * \see cmm_split_check()
540 rc = cmm_split_check(env, mo_p, lname->ln_name);
546 rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
550 #ifdef HAVE_SPLIT_SUPPORT
556 /** Call mdo_create_data() on next layer. All objects are local. */
557 static int cml_create_data(const struct lu_env *env, struct md_object *p,
559 const struct md_op_spec *spec,
564 rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
569 /** Call mdo_link() on next layer. All objects are local. */
570 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
571 struct md_object *mo_s, const struct lu_name *lname,
576 rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
581 /** Call mdo_unlink() on next layer. All objects are local. */
582 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
583 struct md_object *mo_c, const struct lu_name *lname,
588 rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
593 /** Call mdo_lum_lmm_cmp() on next layer */
594 static int cml_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
595 const struct md_op_spec *spec, struct md_attr *ma)
600 rc = mdo_lum_lmm_cmp(env, md_object_next(mo_c), spec, ma);
606 * Get mode of object.
607 * Used in both cml and cmr hence can produce RPC to another server.
609 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
610 const struct lu_fid *lf, struct md_attr *ma,
613 struct md_object *mo_s = md_object_find_slice(env, md, lf);
614 struct cmm_thread_info *cmi;
615 struct md_attr *tmp_ma;
620 RETURN(PTR_ERR(mo_s));
622 if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
625 cmi = cmm_env_info(env);
626 tmp_ma = &cmi->cmi_ma;
627 tmp_ma->ma_need = MA_INODE;
628 tmp_ma->ma_valid = 0;
629 /* get type from src, can be remote req */
630 rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
632 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
633 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
634 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
635 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
636 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
638 lu_object_put(env, &mo_s->mo_lu);
644 * Set ctime for object.
645 * Used in both cml and cmr hence can produce RPC to another server.
647 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
648 const struct lu_fid *lf, struct md_attr *ma)
650 struct md_object *mo_s = md_object_find_slice(env, md, lf);
655 RETURN(PTR_ERR(mo_s));
657 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
658 /* set ctime to obj, can be remote req */
659 rc = mo_attr_set(env, md_object_next(mo_s), ma);
660 lu_object_put(env, &mo_s->mo_lu);
664 /** Helper to output debug information about rename operation. */
665 static inline void cml_rename_warn(const char *fname,
666 struct md_object *mo_po,
667 struct md_object *mo_pn,
668 const struct lu_fid *lf,
670 struct md_object *mo_t,
675 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
676 "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
677 "[tname %s] [err %d]\n", fname,
678 PFID(lu_object_fid(&mo_po->mo_lu)),
679 PFID(lu_object_fid(&mo_pn->mo_lu)),
681 PFID(lu_object_fid(&mo_t->mo_lu)),
684 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
685 "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
686 "[tname %s] [err %d]\n", fname,
687 PFID(lu_object_fid(&mo_po->mo_lu)),
688 PFID(lu_object_fid(&mo_pn->mo_lu)),
694 * Rename operation for cml.
696 * This is the most complex cross-reference operation. It may consist of up to 4
697 * MDS server and require several RPCs to be sent.
699 * \param mo_po Old parent object.
700 * \param mo_pn New parent object.
701 * \param lf FID of object to rename.
702 * \param ls_name Source file name.
703 * \param mo_t target object. Should be NULL here.
704 * \param lt_name Name of target file.
705 * \param ma object attributes.
707 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
708 struct md_object *mo_pn, const struct lu_fid *lf,
709 const struct lu_name *ls_name, struct md_object *mo_t,
710 const struct lu_name *lt_name, struct md_attr *ma)
712 struct cmm_thread_info *cmi;
713 struct md_attr *tmp_ma = NULL;
714 struct md_object *tmp_t = mo_t;
718 rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
722 if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
724 * \note \a mo_t is remote object and there is RPC to unlink it.
725 * Before that, do local sanity check for rename first.
728 struct md_object *mo_s = md_object_find_slice(env,
729 md_obj2dev(mo_po), lf);
731 RETURN(PTR_ERR(mo_s));
733 LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
734 rc = mo_permission(env, md_object_next(mo_po),
735 md_object_next(mo_s),
737 lu_object_put(env, &mo_s->mo_lu);
741 rc = mo_permission(env, NULL, md_object_next(mo_po),
742 ma, MAY_UNLINK | MAY_VTX_FULL);
747 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
748 MAY_UNLINK | MAY_VTX_PART);
753 * /note \a ma will be changed after mo_ref_del(), but we will use
754 * it for mdo_rename() later, so save it before mo_ref_del().
756 cmi = cmm_env_info(env);
757 tmp_ma = &cmi->cmi_ma;
759 rc = mo_ref_del(env, md_object_next(mo_t), ma);
763 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
768 * \note for src on remote MDS case, change its ctime before local
769 * rename. Firstly, do local sanity check for rename if necessary.
773 rc = mo_permission(env, NULL, md_object_next(mo_po),
774 ma, MAY_UNLINK | MAY_VTX_FULL);
779 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
780 rc = mo_permission(env, md_object_next(mo_pn),
781 md_object_next(mo_t),
789 mask = (S_ISDIR(ma->ma_attr.la_mode) ?
790 MAY_LINK : MAY_CREATE);
793 rc = mo_permission(env, NULL,
794 md_object_next(mo_pn),
800 ma->ma_attr_flags |= MDS_PERM_BYPASS;
802 LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
805 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
806 tmp_ma ? tmp_ma : ma);
808 /* TODO: revoke mo_t if necessary. */
809 cml_rename_warn("cmm_rename_ctime", mo_po,
810 mo_pn, lf, ls_name->ln_name,
811 tmp_t, lt_name->ln_name, rc);
816 /* local rename, mo_t can be NULL */
817 rc = mdo_rename(env, md_object_next(mo_po),
818 md_object_next(mo_pn), lf, ls_name,
819 md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
821 /* TODO: revoke all cml_rename */
822 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
823 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
829 * Rename target partial operation.
830 * Used for cross-ref rename.
832 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
833 struct md_object *mo_t, const struct lu_fid *lf,
834 const struct lu_name *lname, struct md_attr *ma)
839 rc = mdo_rename_tgt(env, md_object_next(mo_p),
840 md_object_next(mo_t), lf, lname, ma);
845 * Name insert only operation.
846 * used only in case of rename_tgt() when target doesn't exist.
848 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
849 const struct lu_name *lname, const struct lu_fid *lf,
850 const struct md_attr *ma)
855 rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
862 * Check two fids are not subdirectories.
864 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
865 const struct lu_fid *fid, struct lu_fid *sfid)
867 struct cmm_thread_info *cmi;
871 cmi = cmm_env_info(env);
872 rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
876 if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
879 rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
883 static const struct md_dir_operations cml_dir_ops = {
884 .mdo_is_subdir = cmm_is_subdir,
885 .mdo_lookup = cml_lookup,
886 .mdo_lock_mode = cml_lock_mode,
887 .mdo_create = cml_create,
888 .mdo_link = cml_link,
889 .mdo_unlink = cml_unlink,
890 .mdo_lum_lmm_cmp = cml_lum_lmm_cmp,
891 .mdo_name_insert = cml_name_insert,
892 .mdo_rename = cml_rename,
893 .mdo_rename_tgt = cml_rename_tgt,
894 .mdo_create_data = cml_create_data,
907 /** Get cmr_object from lu_object. */
908 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
910 return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
912 /** Get cmr_object from md_object. */
913 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
915 return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
917 /** Get cmr_object from cmm_object. */
918 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
920 return container_of0(co, struct cmr_object, cmm_obj);
925 * Get proper child device from MDCs.
927 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
929 struct lu_device *next = NULL;
930 struct mdc_device *mdc;
932 cfs_spin_lock(&d->cmm_tgt_guard);
933 cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
934 if (mdc->mc_num == num) {
935 next = mdc2lu_dev(mdc);
939 cfs_spin_unlock(&d->cmm_tgt_guard);
946 static void cmr_object_free(const struct lu_env *env,
947 struct lu_object *lo)
949 struct cmr_object *cro = lu2cmr_obj(lo);
955 * Initialize cmr object.
957 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
958 const struct lu_object_conf *unused)
960 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
961 struct lu_device *c_dev;
962 struct lu_object *c_obj;
967 c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
971 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
972 lo->lo_header, c_dev);
974 lu_object_add(lo, c_obj);
985 * Output lu_object data.
987 static int cmr_object_print(const struct lu_env *env, void *cookie,
988 lu_printer_t p, const struct lu_object *lo)
990 const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
991 return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
995 * Cmr instance of lu_object_operations.
997 static const struct lu_object_operations cmr_obj_ops = {
998 .loo_object_init = cmr_object_init,
999 .loo_object_free = cmr_object_free,
1000 .loo_object_print = cmr_object_print
1004 * \name cmr remote md_object operations.
1005 * All operations here are invalid and return errors. There is no local object
1006 * so these operations return two kinds of error:
1007 * -# -EFAULT if operation is prohibited.
1008 * -# -EREMOTE if operation can be done just to notify upper level about remote
1013 static int cmr_object_create(const struct lu_env *env,
1014 struct md_object *mo,
1015 const struct md_op_spec *spec,
1021 static int cmr_permission(const struct lu_env *env,
1022 struct md_object *p, struct md_object *c,
1023 struct md_attr *attr, int mask)
1028 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
1029 struct md_attr *attr)
1034 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
1035 const struct md_attr *attr)
1040 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
1041 struct lu_buf *buf, const char *name)
1046 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
1052 static int cmr_changelog(const struct lu_env *env, enum changelog_rec_type type,
1053 int flags, struct md_object *mo)
1058 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
1064 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
1065 const struct lu_buf *buf, const char *name,
1071 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
1077 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
1078 const struct md_attr *ma)
1083 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
1089 static int cmr_open(const struct lu_env *env, struct md_object *mo,
1095 static int cmr_close(const struct lu_env *env, struct md_object *mo,
1096 struct md_attr *ma, int mode)
1101 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
1102 const struct lu_rdpg *rdpg)
1107 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
1108 struct lustre_capa *capa, int renewal)
1113 static int cmr_path(const struct lu_env *env, struct md_object *obj,
1114 char *path, int pathlen, __u64 *recno, int *linkno)
1119 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
1124 static int cmr_file_lock(const struct lu_env *env, struct md_object *mo,
1125 struct lov_mds_md *lmm, struct ldlm_extent *extent,
1126 struct lustre_handle *lockh)
1131 static int cmr_file_unlock(const struct lu_env *env, struct md_object *mo,
1132 struct lov_mds_md *lmm, struct lustre_handle *lockh)
1137 static int cmr_lum_lmm_cmp(const struct lu_env *env, struct md_object *mo_c,
1138 const struct md_op_spec *spec, struct md_attr *ma)
1143 /** Set of md_object_operations for cmr. */
1144 static const struct md_object_operations cmr_mo_ops = {
1145 .moo_permission = cmr_permission,
1146 .moo_attr_get = cmr_attr_get,
1147 .moo_attr_set = cmr_attr_set,
1148 .moo_xattr_get = cmr_xattr_get,
1149 .moo_xattr_set = cmr_xattr_set,
1150 .moo_xattr_list = cmr_xattr_list,
1151 .moo_xattr_del = cmr_xattr_del,
1152 .moo_object_create = cmr_object_create,
1153 .moo_ref_add = cmr_ref_add,
1154 .moo_ref_del = cmr_ref_del,
1155 .moo_open = cmr_open,
1156 .moo_close = cmr_close,
1157 .moo_readpage = cmr_readpage,
1158 .moo_readlink = cmr_readlink,
1159 .moo_changelog = cmr_changelog,
1160 .moo_capa_get = cmr_capa_get,
1161 .moo_object_sync = cmr_object_sync,
1162 .moo_path = cmr_path,
1163 .moo_file_lock = cmr_file_lock,
1164 .moo_file_unlock = cmr_file_unlock,
1169 * \name cmr md_dir operations.
1171 * All methods below are cross-ref by nature. They consist of remote call and
1172 * local operation. Due to future rollback functionality there are several
1173 * limitations for such methods:
1174 * -# remote call should be done at first to do epoch negotiation between all
1175 * MDS involved and to avoid the RPC inside transaction.
1176 * -# only one RPC can be sent - also due to epoch negotiation.
1177 * For more details see rollback HLD/DLD.
1180 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1181 const struct lu_name *lname, struct lu_fid *lf,
1182 struct md_op_spec *spec)
1185 * This can happens while rename() If new parent is remote dir, lookup
1192 /** Return lock mode. */
1193 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1194 struct md_object *mo, mdl_mode_t lm)
1200 * Create operation for cmr.
1201 * Remote object creation and local name insert.
1203 * \param mo_p Parent directory. Local object.
1204 * \param lchild_name name of file to create.
1205 * \param mo_c Child object. It has no real inode yet.
1206 * \param spec creation specification.
1207 * \param ma child object attributes.
1209 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1210 const struct lu_name *lchild_name, struct md_object *mo_c,
1211 struct md_op_spec *spec,
1214 struct cmm_thread_info *cmi;
1215 struct md_attr *tmp_ma;
1219 /* Make sure that name isn't exist before doing remote call. */
1220 rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1221 &cmm_env_info(env)->cmi_fid, NULL);
1224 else if (rc != -ENOENT)
1227 /* check the SGID attr */
1228 cmi = cmm_env_info(env);
1230 tmp_ma = &cmi->cmi_ma;
1231 tmp_ma->ma_valid = 0;
1232 tmp_ma->ma_need = MA_INODE;
1234 #ifdef CONFIG_FS_POSIX_ACL
1235 if (!S_ISLNK(ma->ma_attr.la_mode)) {
1236 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1237 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1238 tmp_ma->ma_need |= MA_ACL_DEF;
1241 rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1245 if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1246 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1247 if (S_ISDIR(ma->ma_attr.la_mode)) {
1248 ma->ma_attr.la_mode |= S_ISGID;
1249 ma->ma_attr.la_valid |= LA_MODE;
1253 #ifdef CONFIG_FS_POSIX_ACL
1254 if (tmp_ma->ma_valid & MA_ACL_DEF) {
1255 spec->u.sp_ea.fid = spec->u.sp_pfid;
1256 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1257 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1258 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1262 /* Local permission check for name_insert before remote ops. */
1263 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1264 (S_ISDIR(ma->ma_attr.la_mode) ?
1265 MAY_LINK : MAY_CREATE));
1270 * \note \a ma will be changed after mo_object_create(), but we will use
1271 * it for mdo_name_insert() later, so save it before mo_object_create().
1274 rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1276 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1277 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1278 lu_object_fid(&mo_c->mo_lu), tmp_ma);
1280 /* TODO: remove object mo_c on remote MDS */
1281 CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1282 " [name %s] [mo_c "DFID"] [err %d]\n",
1283 PFID(lu_object_fid(&mo_p->mo_lu)),
1284 lchild_name->ln_name,
1285 PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1293 * Link operations for cmr.
1295 * The link RPC is always issued to the server where source parent is living.
1296 * The first operation to do is object nlink increment on remote server.
1297 * Second one is local mdo_name_insert().
1299 * \param mo_p parent directory. It is local.
1300 * \param mo_s source object to link. It is remote.
1301 * \param lname Name of link file.
1302 * \param ma object attributes.
1304 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1305 struct md_object *mo_s, const struct lu_name *lname,
1311 /* Make sure that name isn't exist before doing remote call. */
1312 rc = mdo_lookup(env, md_object_next(mo_p), lname,
1313 &cmm_env_info(env)->cmi_fid, NULL);
1316 } else if (rc == -ENOENT) {
1317 /* Local permission check for name_insert before remote ops. */
1318 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1323 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1325 ma->ma_attr_flags |= MDS_PERM_BYPASS;
1326 rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1327 lu_object_fid(&mo_s->mo_lu), ma);
1329 /* TODO: ref_del from mo_s on remote MDS */
1330 CWARN("cmr_link failed, should revoke: "
1331 "[mo_p "DFID"] [mo_s "DFID"] "
1332 "[name %s] [err %d]\n",
1333 PFID(lu_object_fid(&mo_p->mo_lu)),
1334 PFID(lu_object_fid(&mo_s->mo_lu)),
1335 lname->ln_name, rc);
1343 * Unlink operations for cmr.
1345 * The unlink RPC is always issued to the server where parent is living. Hence
1346 * the first operation to do is object unlink on remote server. Second one is
1347 * local mdo_name_remove().
1349 * \param mo_p parent md_object. It is local.
1350 * \param mo_c child object to be unlinked. It is remote.
1351 * \param lname Name of file to unlink.
1352 * \param ma object attributes.
1354 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1355 struct md_object *mo_c, const struct lu_name *lname,
1358 struct cmm_thread_info *cmi;
1359 struct md_attr *tmp_ma;
1363 /* Local permission check for name_remove before remote ops. */
1364 rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1365 MAY_UNLINK | MAY_VTX_PART);
1370 * \note \a ma will be changed after mo_ref_del, but we will use
1371 * it for mdo_name_remove() later, so save it before mo_ref_del().
1373 cmi = cmm_env_info(env);
1374 tmp_ma = &cmi->cmi_ma;
1376 rc = mo_ref_del(env, md_object_next(mo_c), ma);
1378 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1379 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1381 /* TODO: ref_add to mo_c on remote MDS */
1382 CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1383 " [mo_c "DFID"] [name %s] [err %d]\n",
1384 PFID(lu_object_fid(&mo_p->mo_lu)),
1385 PFID(lu_object_fid(&mo_c->mo_lu)),
1386 lname->ln_name, rc);
1393 /** Helper which outputs error message during cmr_rename() */
1394 static inline void cmr_rename_warn(const char *fname,
1395 struct md_object *mo_po,
1396 struct md_object *mo_pn,
1397 const struct lu_fid *lf,
1402 CWARN("cmr_rename failed for %s, should revoke: "
1403 "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1404 "[sname %s] [tname %s] [err %d]\n", fname,
1405 PFID(lu_object_fid(&mo_po->mo_lu)),
1406 PFID(lu_object_fid(&mo_pn->mo_lu)),
1407 PFID(lf), s_name, t_name, err);
1411 * Rename operation for cmr.
1413 * This is the most complex cross-reference operation. It may consist of up to 4
1414 * MDS server and require several RPCs to be sent.
1416 * \param mo_po Old parent object.
1417 * \param mo_pn New parent object.
1418 * \param lf FID of object to rename.
1419 * \param ls_name Source file name.
1420 * \param mo_t target object. Should be NULL here.
1421 * \param lt_name Name of target file.
1422 * \param ma object attributes.
1424 static int cmr_rename(const struct lu_env *env,
1425 struct md_object *mo_po, struct md_object *mo_pn,
1426 const struct lu_fid *lf, const struct lu_name *ls_name,
1427 struct md_object *mo_t, const struct lu_name *lt_name,
1430 struct cmm_thread_info *cmi;
1431 struct md_attr *tmp_ma;
1435 LASSERT(mo_t == NULL);
1437 /* get real type of src */
1438 rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1442 /* Local permission check for name_remove before remote ops. */
1443 rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1444 MAY_UNLINK | MAY_VTX_FULL);
1449 * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
1450 * for mdo_name_remove() later, so save it before mdo_rename_tgt.
1452 cmi = cmm_env_info(env);
1453 tmp_ma = &cmi->cmi_ma;
1456 * \note The \a mo_pn is remote directory, so we cannot even know if there is
1457 * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
1458 * lookup and process this further.
1460 rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1461 NULL/* mo_t */, lf, lt_name, ma);
1465 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1467 /* src object maybe on remote MDS, do remote ops first. */
1468 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1470 /* TODO: revoke mdo_rename_tgt */
1471 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1472 ls_name->ln_name, lt_name->ln_name, rc);
1476 /* only old name is removed localy */
1477 rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1479 /* TODO: revoke all cmr_rename */
1480 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1481 ls_name->ln_name, lt_name->ln_name, rc);
1487 * Part of cross-ref rename().
1488 * Used to insert new name in new parent and unlink target.
1490 static int cmr_rename_tgt(const struct lu_env *env,
1491 struct md_object *mo_p, struct md_object *mo_t,
1492 const struct lu_fid *lf, const struct lu_name *lname,
1495 struct cmm_thread_info *cmi;
1496 struct md_attr *tmp_ma;
1500 /* target object is remote one */
1501 /* Local permission check for rename_tgt before remote ops. */
1502 rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1503 MAY_UNLINK | MAY_VTX_PART);
1508 * XXX: @ma maybe changed after mo_ref_del, but we will use
1509 * it for mdo_rename_tgt later, so save it before mo_ref_del.
1511 cmi = cmm_env_info(env);
1512 tmp_ma = &cmi->cmi_ma;
1514 rc = mo_ref_del(env, md_object_next(mo_t), ma);
1515 /* continue locally with name handling only */
1517 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1518 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1519 NULL, lf, lname, tmp_ma);
1521 /* TODO: ref_add to mo_t on remote MDS */
1522 CWARN("cmr_rename_tgt failed, should revoke: "
1523 "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1524 "[name %s] [err %d]\n",
1525 PFID(lu_object_fid(&mo_p->mo_lu)),
1526 PFID(lu_object_fid(&mo_t->mo_lu)),
1528 lname->ln_name, rc);
1535 * The md_dir_operations for cmr.
1537 static const struct md_dir_operations cmr_dir_ops = {
1538 .mdo_is_subdir = cmm_is_subdir,
1539 .mdo_lookup = cmr_lookup,
1540 .mdo_lock_mode = cmr_lock_mode,
1541 .mdo_create = cmr_create,
1542 .mdo_link = cmr_link,
1543 .mdo_unlink = cmr_unlink,
1544 .mdo_lum_lmm_cmp = cmr_lum_lmm_cmp,
1545 .mdo_rename = cmr_rename,
1546 .mdo_rename_tgt = cmr_rename_tgt