1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/cmm/cmm_object.c
38 * Lustre Cluster Metadata Manager (cmm)
40 * Author: Mike Pershin <tappro@clusterfs.com>
44 # define EXPORT_SYMTAB
47 #define DEBUG_SUBSYSTEM S_MDS
49 #include <lustre_fid.h>
50 #include "cmm_internal.h"
51 #include "mdc_internal.h"
54 * Lookup MDS number \a mds by FID \a fid.
56 * \param fid FID of object to find MDS
57 * \param mds mds number to return.
59 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
60 mdsno_t *mds, const struct lu_env *env)
65 LASSERT(fid_is_sane(fid));
67 rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
69 CERROR("Can't find mds by seq "LPX64", rc %d\n",
74 if (*mds > cm->cmm_tgt_count) {
75 CERROR("Got invalid mdsno: %x (max: %x)\n",
76 *mds, cm->cmm_tgt_count);
79 CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
80 LPX64"\n", *mds, fid_seq(fid));
90 static const struct md_object_operations cml_mo_ops;
91 static const struct md_dir_operations cml_dir_ops;
92 static const struct lu_object_operations cml_obj_ops;
94 static const struct md_object_operations cmr_mo_ops;
95 static const struct md_dir_operations cmr_dir_ops;
96 static const struct lu_object_operations cmr_obj_ops;
100 * Allocate CMM object.
102 struct lu_object *cmm_object_alloc(const struct lu_env *env,
103 const struct lu_object_header *loh,
104 struct lu_device *ld)
106 const struct lu_fid *fid = &loh->loh_fid;
107 struct lu_object *lo = NULL;
108 struct cmm_device *cd;
115 if (cd->cmm_flags & CMM_INITIALIZED) {
116 /* get object location */
117 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
122 * Device is not yet initialized, cmm_object is being created
123 * as part of early bootstrap procedure (it is /ROOT, or /fld,
124 * etc.). Such object *has* to be local.
126 mds = cd->cmm_local_num;
128 /* select the proper set of operations based on object location */
129 if (mds == cd->cmm_local_num) {
130 struct cml_object *clo;
134 lo = &clo->cmm_obj.cmo_obj.mo_lu;
135 lu_object_init(lo, NULL, ld);
136 clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
137 clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
138 lo->lo_ops = &cml_obj_ops;
141 struct cmr_object *cro;
145 lo = &cro->cmm_obj.cmo_obj.mo_lu;
146 lu_object_init(lo, NULL, ld);
147 cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
148 cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
149 lo->lo_ops = &cmr_obj_ops;
157 * Get local child device.
159 static struct lu_device *cml_child_dev(struct cmm_device *d)
161 return &d->cmm_child->md_lu_dev;
167 static void cml_object_free(const struct lu_env *env,
168 struct lu_object *lo)
170 struct cml_object *clo = lu2cml_obj(lo);
176 * Initialize cml_object.
178 static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
179 const struct lu_object_conf *unused)
181 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
182 struct lu_device *c_dev;
183 struct lu_object *c_obj;
188 #ifdef HAVE_SPLIT_SUPPORT
189 if (cd->cmm_tgt_count == 0)
190 lu2cml_obj(lo)->clo_split = CMM_SPLIT_DENIED;
192 lu2cml_obj(lo)->clo_split = CMM_SPLIT_UNKNOWN;
194 c_dev = cml_child_dev(cd);
198 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
199 lo->lo_header, c_dev);
201 lu_object_add(lo, c_obj);
211 static int cml_object_print(const struct lu_env *env, void *cookie,
212 lu_printer_t p, const struct lu_object *lo)
214 return (*p)(env, cookie, "[local]");
217 static const struct lu_object_operations cml_obj_ops = {
218 .loo_object_init = cml_object_init,
219 .loo_object_free = cml_object_free,
220 .loo_object_print = cml_object_print
224 * \name CMM local md_object operations.
225 * All of them call just corresponding operations on next layer.
228 static int cml_object_create(const struct lu_env *env,
229 struct md_object *mo,
230 const struct md_op_spec *spec,
231 struct md_attr *attr)
235 rc = mo_object_create(env, md_object_next(mo), spec, attr);
239 static int cml_permission(const struct lu_env *env,
240 struct md_object *p, struct md_object *c,
241 struct md_attr *attr, int mask)
245 rc = mo_permission(env, md_object_next(p), md_object_next(c),
250 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
251 struct md_attr *attr)
255 rc = mo_attr_get(env, md_object_next(mo), attr);
259 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
260 const struct md_attr *attr)
264 rc = mo_attr_set(env, md_object_next(mo), attr);
268 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
269 struct lu_buf *buf, const char *name)
273 rc = mo_xattr_get(env, md_object_next(mo), buf, name);
277 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
282 rc = mo_readlink(env, md_object_next(mo), buf);
286 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
291 rc = mo_xattr_list(env, md_object_next(mo), buf);
295 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
296 const struct lu_buf *buf, const char *name,
301 rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
305 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
310 rc = mo_xattr_del(env, md_object_next(mo), name);
314 static int cml_ref_add(const struct lu_env *env, struct md_object *mo,
315 const struct md_attr *ma)
319 rc = mo_ref_add(env, md_object_next(mo), ma);
323 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
328 rc = mo_ref_del(env, md_object_next(mo), ma);
332 static int cml_open(const struct lu_env *env, struct md_object *mo,
337 rc = mo_open(env, md_object_next(mo), flags);
341 static int cml_close(const struct lu_env *env, struct md_object *mo,
346 rc = mo_close(env, md_object_next(mo), ma);
350 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
351 const struct lu_rdpg *rdpg)
355 rc = mo_readpage(env, md_object_next(mo), rdpg);
359 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
360 struct lustre_capa *capa, int renewal)
364 rc = mo_capa_get(env, md_object_next(mo), capa, renewal);
368 static int cml_path(const struct lu_env *env, struct md_object *mo,
369 char *path, int pathlen, __u64 *recno, int *linkno)
373 rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
377 static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
381 rc = mo_object_sync(env, md_object_next(mo));
385 static dt_obj_version_t cml_version_get(const struct lu_env *env,
386 struct md_object *mo)
388 return mo_version_get(env, md_object_next(mo));
391 static void cml_version_set(const struct lu_env *env, struct md_object *mo,
392 dt_obj_version_t version)
394 return mo_version_set(env, md_object_next(mo), version);
397 static const struct md_object_operations cml_mo_ops = {
398 .moo_permission = cml_permission,
399 .moo_attr_get = cml_attr_get,
400 .moo_attr_set = cml_attr_set,
401 .moo_xattr_get = cml_xattr_get,
402 .moo_xattr_list = cml_xattr_list,
403 .moo_xattr_set = cml_xattr_set,
404 .moo_xattr_del = cml_xattr_del,
405 .moo_object_create = cml_object_create,
406 .moo_ref_add = cml_ref_add,
407 .moo_ref_del = cml_ref_del,
408 .moo_open = cml_open,
409 .moo_close = cml_close,
410 .moo_readpage = cml_readpage,
411 .moo_readlink = cml_readlink,
412 .moo_capa_get = cml_capa_get,
413 .moo_object_sync = cml_object_sync,
414 .moo_version_get = cml_version_get,
415 .moo_version_set = cml_version_set,
416 .moo_path = cml_path,
421 * \name CMM local md_dir_operations.
425 * cml lookup object fid by name.
426 * This returns only FID by name.
428 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
429 const struct lu_name *lname, struct lu_fid *lf,
430 struct md_op_spec *spec)
435 #ifdef HAVE_SPLIT_SUPPORT
436 if (spec != NULL && spec->sp_ck_split) {
437 rc = cmm_split_check(env, mo_p, lname->ln_name);
442 rc = mdo_lookup(env, md_object_next(mo_p), lname, lf, spec);
448 * Helper to return lock mode. Used in split cases only.
450 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
451 struct md_object *mo, mdl_mode_t lm)
453 int rc = MDL_MINMODE;
456 #ifdef HAVE_SPLIT_SUPPORT
457 rc = cmm_split_access(env, mo, lm);
464 * Create operation for cml.
465 * Objects are local, but split can happen.
466 * If split is not needed this will call next layer mdo_create().
468 * \param mo_p Parent directory. Local object.
469 * \param lname name of file to create.
470 * \param mo_c Child object. It has no real inode yet.
471 * \param spec creation specification.
472 * \param ma child object attributes.
474 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
475 const struct lu_name *lname, struct md_object *mo_c,
476 struct md_op_spec *spec, struct md_attr *ma)
481 #ifdef HAVE_SPLIT_SUPPORT
482 /* Lock mode always should be sane. */
483 LASSERT(spec->sp_cr_mode != MDL_MINMODE);
486 * Sigh... This is long story. MDT may have race with detecting if split
487 * is possible in cmm. We know this race and let it live, because
488 * getting it rid (with some sem or spinlock) will also mean that
489 * PDIROPS for create will not work because we kill parallel work, what
490 * is really bad for performance and makes no sense having PDIROPS. So,
491 * we better allow the race to live, but split dir only if some of
492 * concurrent threads takes EX lock, not matter which one. So that, say,
493 * two concurrent threads may have different lock modes on directory (CW
494 * and EX) and not first one which comes here and see that split is
495 * possible should split the dir, but only that one which has EX
496 * lock. And we do not care that in this case, split may happen a bit
497 * later (when dir size will not be necessarily 64K, but may be a bit
498 * larger). So that, we allow concurrent creates and protect split by EX
501 if (spec->sp_cr_mode == MDL_EX) {
504 * - Try to split \a mo_p upon each create operation.
505 * If split is ok, -ERESTART is returned and current thread
506 * will not peoceed with create. Instead it sends -ERESTART
507 * to client to let it know that correct MDT must be chosen.
508 * \see cmm_split_dir()
510 rc = cmm_split_dir(env, mo_p);
513 * -ERESTART or some split error is returned, we can't
514 * proceed with create.
519 if (spec != NULL && spec->sp_ck_split) {
521 * - Directory is split already. Let the caller know that
522 * it should tell client that directory is split and operation
523 * should repeat to correct MDT.
524 * \see cmm_split_check()
526 rc = cmm_split_check(env, mo_p, lname->ln_name);
532 rc = mdo_create(env, md_object_next(mo_p), lname, md_object_next(mo_c),
536 #ifdef HAVE_SPLIT_SUPPORT
542 /** Call mdo_create_data() on next layer. All objects are local. */
543 static int cml_create_data(const struct lu_env *env, struct md_object *p,
545 const struct md_op_spec *spec,
550 rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
555 /** Call mdo_link() on next layer. All objects are local. */
556 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
557 struct md_object *mo_s, const struct lu_name *lname,
562 rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
567 /** Call mdo_unlink() on next layer. All objects are local. */
568 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
569 struct md_object *mo_c, const struct lu_name *lname,
574 rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
581 * Get mode of object.
582 * Used in both cml and cmr hence can produce RPC to another server.
584 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
585 const struct lu_fid *lf, struct md_attr *ma,
588 struct md_object *mo_s = md_object_find_slice(env, md, lf);
589 struct cmm_thread_info *cmi;
590 struct md_attr *tmp_ma;
595 RETURN(PTR_ERR(mo_s));
597 if (remote && (lu_object_exists(&mo_s->mo_lu) < 0))
600 cmi = cmm_env_info(env);
601 tmp_ma = &cmi->cmi_ma;
602 tmp_ma->ma_need = MA_INODE;
603 tmp_ma->ma_valid = 0;
604 /* get type from src, can be remote req */
605 rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
607 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
608 ma->ma_attr.la_uid = tmp_ma->ma_attr.la_uid;
609 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
610 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
611 ma->ma_attr.la_valid |= LA_MODE | LA_UID | LA_GID | LA_FLAGS;
613 lu_object_put(env, &mo_s->mo_lu);
619 * Set ctime for object.
620 * Used in both cml and cmr hence can produce RPC to another server.
622 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
623 const struct lu_fid *lf, struct md_attr *ma)
625 struct md_object *mo_s = md_object_find_slice(env, md, lf);
630 RETURN(PTR_ERR(mo_s));
632 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
633 /* set ctime to obj, can be remote req */
634 rc = mo_attr_set(env, md_object_next(mo_s), ma);
635 lu_object_put(env, &mo_s->mo_lu);
639 /** Helper to output debug information about rename operation. */
640 static inline void cml_rename_warn(const char *fname,
641 struct md_object *mo_po,
642 struct md_object *mo_pn,
643 const struct lu_fid *lf,
645 struct md_object *mo_t,
650 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
651 "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t "DFID"] "
652 "[tname %s] [err %d]\n", fname,
653 PFID(lu_object_fid(&mo_po->mo_lu)),
654 PFID(lu_object_fid(&mo_pn->mo_lu)),
656 PFID(lu_object_fid(&mo_t->mo_lu)),
659 CWARN("cml_rename failed for %s, should revoke: [mo_po "DFID"] "
660 "[mo_pn "DFID"] [lf "DFID"] [sname %s] [mo_t NULL] "
661 "[tname %s] [err %d]\n", fname,
662 PFID(lu_object_fid(&mo_po->mo_lu)),
663 PFID(lu_object_fid(&mo_pn->mo_lu)),
669 * Rename operation for cml.
671 * This is the most complex cross-reference operation. It may consist of up to 4
672 * MDS server and require several RPCs to be sent.
674 * \param mo_po Old parent object.
675 * \param mo_pn New parent object.
676 * \param lf FID of object to rename.
677 * \param ls_name Source file name.
678 * \param mo_t target object. Should be NULL here.
679 * \param lt_name Name of target file.
680 * \param ma object attributes.
682 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
683 struct md_object *mo_pn, const struct lu_fid *lf,
684 const struct lu_name *ls_name, struct md_object *mo_t,
685 const struct lu_name *lt_name, struct md_attr *ma)
687 struct cmm_thread_info *cmi;
688 struct md_attr *tmp_ma = NULL;
689 struct md_object *tmp_t = mo_t;
693 rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, &remote);
697 if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
699 * \note \a mo_t is remote object and there is RPC to unlink it.
700 * Before that, do local sanity check for rename first.
703 struct md_object *mo_s = md_object_find_slice(env,
704 md_obj2dev(mo_po), lf);
706 RETURN(PTR_ERR(mo_s));
708 LASSERT(lu_object_exists(&mo_s->mo_lu) > 0);
709 rc = mo_permission(env, md_object_next(mo_po),
710 md_object_next(mo_s),
712 lu_object_put(env, &mo_s->mo_lu);
716 rc = mo_permission(env, NULL, md_object_next(mo_po),
717 ma, MAY_UNLINK | MAY_VTX_FULL);
722 rc = mo_permission(env, NULL, md_object_next(mo_pn), ma,
723 MAY_UNLINK | MAY_VTX_PART);
728 * /note \a ma will be changed after mo_ref_del(), but we will use
729 * it for mdo_rename() later, so save it before mo_ref_del().
731 cmi = cmm_env_info(env);
732 tmp_ma = &cmi->cmi_ma;
734 rc = mo_ref_del(env, md_object_next(mo_t), ma);
738 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
743 * \note for src on remote MDS case, change its ctime before local
744 * rename. Firstly, do local sanity check for rename if necessary.
748 rc = mo_permission(env, NULL, md_object_next(mo_po),
749 ma, MAY_UNLINK | MAY_VTX_FULL);
754 LASSERT(lu_object_exists(&mo_t->mo_lu) > 0);
755 rc = mo_permission(env, md_object_next(mo_pn),
756 md_object_next(mo_t),
764 mask = (S_ISDIR(ma->ma_attr.la_mode) ?
765 MAY_LINK : MAY_CREATE);
768 rc = mo_permission(env, NULL,
769 md_object_next(mo_pn),
775 ma->ma_attr_flags |= MDS_PERM_BYPASS;
777 LASSERT(tmp_ma->ma_attr_flags & MDS_PERM_BYPASS);
780 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf,
781 tmp_ma ? tmp_ma : ma);
783 /* TODO: revoke mo_t if necessary. */
784 cml_rename_warn("cmm_rename_ctime", mo_po,
785 mo_pn, lf, ls_name->ln_name,
786 tmp_t, lt_name->ln_name, rc);
791 /* local rename, mo_t can be NULL */
792 rc = mdo_rename(env, md_object_next(mo_po),
793 md_object_next(mo_pn), lf, ls_name,
794 md_object_next(mo_t), lt_name, tmp_ma ? tmp_ma : ma);
796 /* TODO: revoke all cml_rename */
797 cml_rename_warn("mdo_rename", mo_po, mo_pn, lf,
798 ls_name->ln_name, tmp_t, lt_name->ln_name, rc);
804 * Rename target partial operation.
805 * Used for cross-ref rename.
807 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
808 struct md_object *mo_t, const struct lu_fid *lf,
809 const struct lu_name *lname, struct md_attr *ma)
814 rc = mdo_rename_tgt(env, md_object_next(mo_p),
815 md_object_next(mo_t), lf, lname, ma);
820 * Name insert only operation.
821 * used only in case of rename_tgt() when target doesn't exist.
823 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
824 const struct lu_name *lname, const struct lu_fid *lf,
825 const struct md_attr *ma)
830 rc = mdo_name_insert(env, md_object_next(p), lname, lf, ma);
837 * Check two fids are not subdirectories.
839 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
840 const struct lu_fid *fid, struct lu_fid *sfid)
842 struct cmm_thread_info *cmi;
846 cmi = cmm_env_info(env);
847 rc = cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma, NULL);
851 if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
854 rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
858 static const struct md_dir_operations cml_dir_ops = {
859 .mdo_is_subdir = cmm_is_subdir,
860 .mdo_lookup = cml_lookup,
861 .mdo_lock_mode = cml_lock_mode,
862 .mdo_create = cml_create,
863 .mdo_link = cml_link,
864 .mdo_unlink = cml_unlink,
865 .mdo_name_insert = cml_name_insert,
866 .mdo_rename = cml_rename,
867 .mdo_rename_tgt = cml_rename_tgt,
868 .mdo_create_data = cml_create_data
881 /** Get cmr_object from lu_object. */
882 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
884 return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
886 /** Get cmr_object from md_object. */
887 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
889 return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
891 /** Get cmr_object from cmm_object. */
892 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
894 return container_of0(co, struct cmr_object, cmm_obj);
899 * Get proper child device from MDCs.
901 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
903 struct lu_device *next = NULL;
904 struct mdc_device *mdc;
906 cfs_spin_lock(&d->cmm_tgt_guard);
907 cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
908 if (mdc->mc_num == num) {
909 next = mdc2lu_dev(mdc);
913 cfs_spin_unlock(&d->cmm_tgt_guard);
920 static void cmr_object_free(const struct lu_env *env,
921 struct lu_object *lo)
923 struct cmr_object *cro = lu2cmr_obj(lo);
929 * Initialize cmr object.
931 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
932 const struct lu_object_conf *unused)
934 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
935 struct lu_device *c_dev;
936 struct lu_object *c_obj;
941 c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
945 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
946 lo->lo_header, c_dev);
948 lu_object_add(lo, c_obj);
959 * Output lu_object data.
961 static int cmr_object_print(const struct lu_env *env, void *cookie,
962 lu_printer_t p, const struct lu_object *lo)
964 const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
965 return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
969 * Cmr instance of lu_object_operations.
971 static const struct lu_object_operations cmr_obj_ops = {
972 .loo_object_init = cmr_object_init,
973 .loo_object_free = cmr_object_free,
974 .loo_object_print = cmr_object_print
978 * \name cmr remote md_object operations.
979 * All operations here are invalid and return errors. There is no local object
980 * so these operations return two kinds of error:
981 * -# -EFAULT if operation is prohibited.
982 * -# -EREMOTE if operation can be done just to notify upper level about remote
987 static int cmr_object_create(const struct lu_env *env,
988 struct md_object *mo,
989 const struct md_op_spec *spec,
995 static int cmr_permission(const struct lu_env *env,
996 struct md_object *p, struct md_object *c,
997 struct md_attr *attr, int mask)
1002 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
1003 struct md_attr *attr)
1008 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
1009 const struct md_attr *attr)
1014 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
1015 struct lu_buf *buf, const char *name)
1020 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
1026 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
1032 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
1033 const struct lu_buf *buf, const char *name,
1039 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
1045 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo,
1046 const struct md_attr *ma)
1051 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
1057 static int cmr_open(const struct lu_env *env, struct md_object *mo,
1063 static int cmr_close(const struct lu_env *env, struct md_object *mo,
1069 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
1070 const struct lu_rdpg *rdpg)
1075 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
1076 struct lustre_capa *capa, int renewal)
1081 static int cmr_path(const struct lu_env *env, struct md_object *obj,
1082 char *path, int pathlen, __u64 *recno, int *linkno)
1087 static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
1093 * cmr moo_version_get().
1095 static dt_obj_version_t cmr_version_get(const struct lu_env *env,
1096 struct md_object *mo)
1098 /** Don't check remote object version */
1104 * cmr moo_version_set().
1105 * No need to update remote object version here, it is done as a part
1106 * of reintegration of partial operation on the remote server.
1108 static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
1109 dt_obj_version_t version)
1114 /** Set of md_object_operations for cmr. */
1115 static const struct md_object_operations cmr_mo_ops = {
1116 .moo_permission = cmr_permission,
1117 .moo_attr_get = cmr_attr_get,
1118 .moo_attr_set = cmr_attr_set,
1119 .moo_xattr_get = cmr_xattr_get,
1120 .moo_xattr_set = cmr_xattr_set,
1121 .moo_xattr_list = cmr_xattr_list,
1122 .moo_xattr_del = cmr_xattr_del,
1123 .moo_object_create = cmr_object_create,
1124 .moo_ref_add = cmr_ref_add,
1125 .moo_ref_del = cmr_ref_del,
1126 .moo_open = cmr_open,
1127 .moo_close = cmr_close,
1128 .moo_readpage = cmr_readpage,
1129 .moo_readlink = cmr_readlink,
1130 .moo_capa_get = cmr_capa_get,
1131 .moo_object_sync = cmr_object_sync,
1132 .moo_version_get = cmr_version_get,
1133 .moo_version_set = cmr_version_set,
1134 .moo_path = cmr_path,
1139 * \name cmr md_dir operations.
1141 * All methods below are cross-ref by nature. They consist of remote call and
1142 * local operation. Due to future rollback functionality there are several
1143 * limitations for such methods:
1144 * -# remote call should be done at first to do epoch negotiation between all
1145 * MDS involved and to avoid the RPC inside transaction.
1146 * -# only one RPC can be sent - also due to epoch negotiation.
1147 * For more details see rollback HLD/DLD.
1150 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
1151 const struct lu_name *lname, struct lu_fid *lf,
1152 struct md_op_spec *spec)
1155 * This can happens while rename() If new parent is remote dir, lookup
1162 /** Return lock mode. */
1163 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
1164 struct md_object *mo, mdl_mode_t lm)
1170 * Create operation for cmr.
1171 * Remote object creation and local name insert.
1173 * \param mo_p Parent directory. Local object.
1174 * \param lchild_name name of file to create.
1175 * \param mo_c Child object. It has no real inode yet.
1176 * \param spec creation specification.
1177 * \param ma child object attributes.
1179 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
1180 const struct lu_name *lchild_name, struct md_object *mo_c,
1181 struct md_op_spec *spec,
1184 struct cmm_thread_info *cmi;
1185 struct md_attr *tmp_ma;
1189 /* Make sure that name isn't exist before doing remote call. */
1190 rc = mdo_lookup(env, md_object_next(mo_p), lchild_name,
1191 &cmm_env_info(env)->cmi_fid, NULL);
1194 else if (rc != -ENOENT)
1197 /* check the SGID attr */
1198 cmi = cmm_env_info(env);
1200 tmp_ma = &cmi->cmi_ma;
1201 tmp_ma->ma_valid = 0;
1202 tmp_ma->ma_need = MA_INODE;
1204 #ifdef CONFIG_FS_POSIX_ACL
1205 if (!S_ISLNK(ma->ma_attr.la_mode)) {
1206 tmp_ma->ma_acl = cmi->cmi_xattr_buf;
1207 tmp_ma->ma_acl_size = sizeof(cmi->cmi_xattr_buf);
1208 tmp_ma->ma_need |= MA_ACL_DEF;
1211 rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
1215 if (tmp_ma->ma_attr.la_mode & S_ISGID) {
1216 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
1217 if (S_ISDIR(ma->ma_attr.la_mode)) {
1218 ma->ma_attr.la_mode |= S_ISGID;
1219 ma->ma_attr.la_valid |= LA_MODE;
1223 #ifdef CONFIG_FS_POSIX_ACL
1224 if (tmp_ma->ma_valid & MA_ACL_DEF) {
1225 spec->u.sp_ea.fid = spec->u.sp_pfid;
1226 spec->u.sp_ea.eadata = tmp_ma->ma_acl;
1227 spec->u.sp_ea.eadatalen = tmp_ma->ma_acl_size;
1228 spec->sp_cr_flags |= MDS_CREATE_RMT_ACL;
1232 /* Local permission check for name_insert before remote ops. */
1233 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1234 (S_ISDIR(ma->ma_attr.la_mode) ?
1235 MAY_LINK : MAY_CREATE));
1240 * \note \a ma will be changed after mo_object_create(), but we will use
1241 * it for mdo_name_insert() later, so save it before mo_object_create().
1244 rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
1246 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1247 rc = mdo_name_insert(env, md_object_next(mo_p), lchild_name,
1248 lu_object_fid(&mo_c->mo_lu), tmp_ma);
1250 /* TODO: remove object mo_c on remote MDS */
1251 CWARN("cmr_create failed, should revoke: [mo_p "DFID"]"
1252 " [name %s] [mo_c "DFID"] [err %d]\n",
1253 PFID(lu_object_fid(&mo_p->mo_lu)),
1254 lchild_name->ln_name,
1255 PFID(lu_object_fid(&mo_c->mo_lu)), rc);
1263 * Link operations for cmr.
1265 * The link RPC is always issued to the server where source parent is living.
1266 * The first operation to do is object nlink increment on remote server.
1267 * Second one is local mdo_name_insert().
1269 * \param mo_p parent directory. It is local.
1270 * \param mo_s source object to link. It is remote.
1271 * \param lname Name of link file.
1272 * \param ma object attributes.
1274 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
1275 struct md_object *mo_s, const struct lu_name *lname,
1281 /* Make sure that name isn't exist before doing remote call. */
1282 rc = mdo_lookup(env, md_object_next(mo_p), lname,
1283 &cmm_env_info(env)->cmi_fid, NULL);
1286 } else if (rc == -ENOENT) {
1287 /* Local permission check for name_insert before remote ops. */
1288 rc = mo_permission(env, NULL, md_object_next(mo_p), NULL,
1293 rc = mo_ref_add(env, md_object_next(mo_s), ma);
1295 ma->ma_attr_flags |= MDS_PERM_BYPASS;
1296 rc = mdo_name_insert(env, md_object_next(mo_p), lname,
1297 lu_object_fid(&mo_s->mo_lu), ma);
1299 /* TODO: ref_del from mo_s on remote MDS */
1300 CWARN("cmr_link failed, should revoke: "
1301 "[mo_p "DFID"] [mo_s "DFID"] "
1302 "[name %s] [err %d]\n",
1303 PFID(lu_object_fid(&mo_p->mo_lu)),
1304 PFID(lu_object_fid(&mo_s->mo_lu)),
1305 lname->ln_name, rc);
1313 * Unlink operations for cmr.
1315 * The unlink RPC is always issued to the server where parent is living. Hence
1316 * the first operation to do is object unlink on remote server. Second one is
1317 * local mdo_name_remove().
1319 * \param mo_p parent md_object. It is local.
1320 * \param mo_c child object to be unlinked. It is remote.
1321 * \param lname Name of file to unlink.
1322 * \param ma object attributes.
1324 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
1325 struct md_object *mo_c, const struct lu_name *lname,
1328 struct cmm_thread_info *cmi;
1329 struct md_attr *tmp_ma;
1333 /* Local permission check for name_remove before remote ops. */
1334 rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1335 MAY_UNLINK | MAY_VTX_PART);
1340 * \note \a ma will be changed after mo_ref_del, but we will use
1341 * it for mdo_name_remove() later, so save it before mo_ref_del().
1343 cmi = cmm_env_info(env);
1344 tmp_ma = &cmi->cmi_ma;
1346 rc = mo_ref_del(env, md_object_next(mo_c), ma);
1348 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1349 rc = mdo_name_remove(env, md_object_next(mo_p), lname, tmp_ma);
1351 /* TODO: ref_add to mo_c on remote MDS */
1352 CWARN("cmr_unlink failed, should revoke: [mo_p "DFID"]"
1353 " [mo_c "DFID"] [name %s] [err %d]\n",
1354 PFID(lu_object_fid(&mo_p->mo_lu)),
1355 PFID(lu_object_fid(&mo_c->mo_lu)),
1356 lname->ln_name, rc);
1363 /** Helper which outputs error message during cmr_rename() */
1364 static inline void cmr_rename_warn(const char *fname,
1365 struct md_object *mo_po,
1366 struct md_object *mo_pn,
1367 const struct lu_fid *lf,
1372 CWARN("cmr_rename failed for %s, should revoke: "
1373 "[mo_po "DFID"] [mo_pn "DFID"] [lf "DFID"] "
1374 "[sname %s] [tname %s] [err %d]\n", fname,
1375 PFID(lu_object_fid(&mo_po->mo_lu)),
1376 PFID(lu_object_fid(&mo_pn->mo_lu)),
1377 PFID(lf), s_name, t_name, err);
1381 * Rename operation for cmr.
1383 * This is the most complex cross-reference operation. It may consist of up to 4
1384 * MDS server and require several RPCs to be sent.
1386 * \param mo_po Old parent object.
1387 * \param mo_pn New parent object.
1388 * \param lf FID of object to rename.
1389 * \param ls_name Source file name.
1390 * \param mo_t target object. Should be NULL here.
1391 * \param lt_name Name of target file.
1392 * \param ma object attributes.
1394 static int cmr_rename(const struct lu_env *env,
1395 struct md_object *mo_po, struct md_object *mo_pn,
1396 const struct lu_fid *lf, const struct lu_name *ls_name,
1397 struct md_object *mo_t, const struct lu_name *lt_name,
1400 struct cmm_thread_info *cmi;
1401 struct md_attr *tmp_ma;
1405 LASSERT(mo_t == NULL);
1407 /* get real type of src */
1408 rc = cmm_mode_get(env, md_obj2dev(mo_po), lf, ma, NULL);
1412 /* Local permission check for name_remove before remote ops. */
1413 rc = mo_permission(env, NULL, md_object_next(mo_po), ma,
1414 MAY_UNLINK | MAY_VTX_FULL);
1419 * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
1420 * for mdo_name_remove() later, so save it before mdo_rename_tgt.
1422 cmi = cmm_env_info(env);
1423 tmp_ma = &cmi->cmi_ma;
1426 * \note The \a mo_pn is remote directory, so we cannot even know if there is
1427 * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
1428 * lookup and process this further.
1430 rc = mdo_rename_tgt(env, md_object_next(mo_pn),
1431 NULL/* mo_t */, lf, lt_name, ma);
1435 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1437 /* src object maybe on remote MDS, do remote ops first. */
1438 rc = cmm_rename_ctime(env, md_obj2dev(mo_po), lf, tmp_ma);
1440 /* TODO: revoke mdo_rename_tgt */
1441 cmr_rename_warn("cmm_rename_ctime", mo_po, mo_pn, lf,
1442 ls_name->ln_name, lt_name->ln_name, rc);
1446 /* only old name is removed localy */
1447 rc = mdo_name_remove(env, md_object_next(mo_po), ls_name, tmp_ma);
1449 /* TODO: revoke all cmr_rename */
1450 cmr_rename_warn("mdo_name_remove", mo_po, mo_pn, lf,
1451 ls_name->ln_name, lt_name->ln_name, rc);
1457 * Part of cross-ref rename().
1458 * Used to insert new name in new parent and unlink target.
1460 static int cmr_rename_tgt(const struct lu_env *env,
1461 struct md_object *mo_p, struct md_object *mo_t,
1462 const struct lu_fid *lf, const struct lu_name *lname,
1465 struct cmm_thread_info *cmi;
1466 struct md_attr *tmp_ma;
1470 /* target object is remote one */
1471 /* Local permission check for rename_tgt before remote ops. */
1472 rc = mo_permission(env, NULL, md_object_next(mo_p), ma,
1473 MAY_UNLINK | MAY_VTX_PART);
1478 * XXX: @ma maybe changed after mo_ref_del, but we will use
1479 * it for mdo_rename_tgt later, so save it before mo_ref_del.
1481 cmi = cmm_env_info(env);
1482 tmp_ma = &cmi->cmi_ma;
1484 rc = mo_ref_del(env, md_object_next(mo_t), ma);
1485 /* continue locally with name handling only */
1487 tmp_ma->ma_attr_flags |= MDS_PERM_BYPASS;
1488 rc = mdo_rename_tgt(env, md_object_next(mo_p),
1489 NULL, lf, lname, tmp_ma);
1491 /* TODO: ref_add to mo_t on remote MDS */
1492 CWARN("cmr_rename_tgt failed, should revoke: "
1493 "[mo_p "DFID"] [mo_t "DFID"] [lf "DFID"] "
1494 "[name %s] [err %d]\n",
1495 PFID(lu_object_fid(&mo_p->mo_lu)),
1496 PFID(lu_object_fid(&mo_t->mo_lu)),
1498 lname->ln_name, rc);
1505 * The md_dir_operations for cmr.
1507 static const struct md_dir_operations cmr_dir_ops = {
1508 .mdo_is_subdir = cmm_is_subdir,
1509 .mdo_lookup = cmr_lookup,
1510 .mdo_lock_mode = cmr_lock_mode,
1511 .mdo_create = cmr_create,
1512 .mdo_link = cmr_link,
1513 .mdo_unlink = cmr_unlink,
1514 .mdo_rename = cmr_rename,
1515 .mdo_rename_tgt = cmr_rename_tgt