1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/cmm/cmm_object.c
5 * Lustre Cluster Metadata Manager (cmm)
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Mike Pershin <tappro@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
33 #define DEBUG_SUBSYSTEM S_MDS
35 #include <lustre_fid.h>
36 #include "cmm_internal.h"
37 #include "mdc_internal.h"
39 extern struct lu_context_key cmm_thread_key;
41 static int cmm_fld_lookup(struct cmm_device *cm,
42 const struct lu_fid *fid, mdsno_t *mds,
43 const struct lu_env *env)
49 LASSERT(fid_is_sane(fid));
51 ls = cm->cmm_md_dev.md_lu_dev.ld_site;
53 rc = fld_client_lookup(ls->ls_client_fld,
54 fid_seq(fid), mds, env);
56 CERROR("can't find mds by seq "LPX64", rc %d\n",
61 if (*mds > cm->cmm_tgt_count) {
62 CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
63 *mds, cm->cmm_tgt_count);
66 CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "LPU64"\n",
73 static struct md_object_operations cml_mo_ops;
74 static struct md_dir_operations cml_dir_ops;
75 static struct lu_object_operations cml_obj_ops;
77 static struct md_object_operations cmr_mo_ops;
78 static struct md_dir_operations cmr_dir_ops;
79 static struct lu_object_operations cmr_obj_ops;
81 struct lu_object *cmm_object_alloc(const struct lu_env *env,
82 const struct lu_object_header *loh,
85 struct lu_object *lo = NULL;
86 const struct lu_fid *fid = &loh->loh_fid;
87 struct cmm_device *cd;
94 if (cd->cmm_flags & CMM_INITIALIZED) {
95 /* get object location */
96 rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mdsnum, env);
101 * Device is not yet initialized, cmm_object is being created
102 * as part of early bootstrap procedure (it is /ROOT, or /fld,
103 * etc.). Such object *has* to be local.
105 mdsnum = cd->cmm_local_num;
107 /* select the proper set of operations based on object location */
108 if (mdsnum == cd->cmm_local_num) {
109 struct cml_object *clo;
113 lo = &clo->cmm_obj.cmo_obj.mo_lu;
114 lu_object_init(lo, NULL, ld);
115 clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
116 clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
117 lo->lo_ops = &cml_obj_ops;
120 struct cmr_object *cro;
124 lo = &cro->cmm_obj.cmo_obj.mo_lu;
125 lu_object_init(lo, NULL, ld);
126 cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
127 cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
128 lo->lo_ops = &cmr_obj_ops;
129 cro->cmo_num = mdsnum;
136 * CMM has two types of objects - local and remote. They have different set
137 * of operations so we are avoiding multiple checks in code.
141 * local CMM object operations. cml_...
143 static inline struct cml_object *lu2cml_obj(struct lu_object *o)
145 return container_of0(o, struct cml_object, cmm_obj.cmo_obj.mo_lu);
147 static inline struct cml_object *md2cml_obj(struct md_object *mo)
149 return container_of0(mo, struct cml_object, cmm_obj.cmo_obj);
151 static inline struct cml_object *cmm2cml_obj(struct cmm_object *co)
153 return container_of0(co, struct cml_object, cmm_obj);
155 /* get local child device */
156 static struct lu_device *cml_child_dev(struct cmm_device *d)
158 return &d->cmm_child->md_lu_dev;
161 /* lu_object operations */
162 static void cml_object_free(const struct lu_env *env,
163 struct lu_object *lo)
165 struct cml_object *clo = lu2cml_obj(lo);
170 static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
172 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
173 struct lu_device *c_dev;
174 struct lu_object *c_obj;
179 c_dev = cml_child_dev(cd);
183 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
184 lo->lo_header, c_dev);
186 lu_object_add(lo, c_obj);
196 static int cml_object_print(const struct lu_env *env, void *cookie,
197 lu_printer_t p, const struct lu_object *lo)
199 return (*p)(env, cookie, LUSTRE_CMM_NAME"-local@%p", lo);
202 static struct lu_object_operations cml_obj_ops = {
203 .loo_object_init = cml_object_init,
204 .loo_object_free = cml_object_free,
205 .loo_object_print = cml_object_print
208 /* CMM local md_object operations */
209 static int cml_object_create(const struct lu_env *env,
210 struct md_object *mo,
211 const struct md_create_spec *spec,
212 struct md_attr *attr)
216 rc = mo_object_create(env, md_object_next(mo), spec, attr);
220 static int cml_permission(const struct lu_env *env,
221 struct md_object *mo, int mask)
225 rc = mo_permission(env, md_object_next(mo), mask);
229 static int cml_attr_get(const struct lu_env *env, struct md_object *mo,
230 struct md_attr *attr)
234 rc = mo_attr_get(env, md_object_next(mo), attr);
238 static int cml_attr_set(const struct lu_env *env, struct md_object *mo,
239 const struct md_attr *attr)
243 rc = mo_attr_set(env, md_object_next(mo), attr);
247 static int cml_xattr_get(const struct lu_env *env, struct md_object *mo,
248 struct lu_buf *buf, const char *name)
252 rc = mo_xattr_get(env, md_object_next(mo), buf, name);
256 static int cml_readlink(const struct lu_env *env, struct md_object *mo,
261 rc = mo_readlink(env, md_object_next(mo), buf);
265 static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
270 rc = mo_xattr_list(env, md_object_next(mo), buf);
274 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
275 const struct lu_buf *buf,
276 const char *name, int fl)
280 rc = mo_xattr_set(env, md_object_next(mo), buf, name, fl);
284 static int cml_xattr_del(const struct lu_env *env, struct md_object *mo,
289 rc = mo_xattr_del(env, md_object_next(mo), name);
293 static int cml_ref_add(const struct lu_env *env, struct md_object *mo)
297 rc = mo_ref_add(env, md_object_next(mo));
301 static int cml_ref_del(const struct lu_env *env, struct md_object *mo,
306 rc = mo_ref_del(env, md_object_next(mo), ma);
310 static int cml_open(const struct lu_env *env, struct md_object *mo,
315 rc = mo_open(env, md_object_next(mo), flags);
319 static int cml_close(const struct lu_env *env, struct md_object *mo,
324 rc = mo_close(env, md_object_next(mo), ma);
328 static int cml_readpage(const struct lu_env *env, struct md_object *mo,
329 const struct lu_rdpg *rdpg)
333 rc = mo_readpage(env, md_object_next(mo), rdpg);
337 static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
338 struct lustre_capa *capa)
342 rc = mo_capa_get(env, md_object_next(mo), capa);
346 static struct md_object_operations cml_mo_ops = {
347 .moo_permission = cml_permission,
348 .moo_attr_get = cml_attr_get,
349 .moo_attr_set = cml_attr_set,
350 .moo_xattr_get = cml_xattr_get,
351 .moo_xattr_list = cml_xattr_list,
352 .moo_xattr_set = cml_xattr_set,
353 .moo_xattr_del = cml_xattr_del,
354 .moo_object_create = cml_object_create,
355 .moo_ref_add = cml_ref_add,
356 .moo_ref_del = cml_ref_del,
357 .moo_open = cml_open,
358 .moo_close = cml_close,
359 .moo_readpage = cml_readpage,
360 .moo_readlink = cml_readlink,
361 .moo_capa_get = cml_capa_get
364 /* md_dir operations */
365 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
366 const char *name, struct lu_fid *lf)
370 rc = mdo_lookup(env, md_object_next(mo_p), name, lf);
375 static int cml_create(const struct lu_env *env,
376 struct md_object *mo_p, const char *child_name,
377 struct md_object *mo_c, const struct md_create_spec *spec,
383 #ifdef HAVE_SPLIT_SUPPORT
384 rc = cml_try_to_split(env, mo_p);
389 rc = mdo_create(env, md_object_next(mo_p), child_name,
390 md_object_next(mo_c), spec, ma);
396 static int cml_create_data(const struct lu_env *env, struct md_object *p,
398 const struct md_create_spec *spec,
403 rc = mdo_create_data(env, md_object_next(p), md_object_next(o),
408 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
409 struct md_object *mo_s, const char *name,
414 rc = mdo_link(env, md_object_next(mo_p), md_object_next(mo_s),
419 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
420 struct md_object *mo_c, const char *name,
425 rc = mdo_unlink(env, md_object_next(mo_p), md_object_next(mo_c),
430 /* rename is split to local/remote by location of new parent dir */
431 struct md_object *md_object_find(const struct lu_env *env,
432 struct md_device *md,
433 const struct lu_fid *f)
439 o = lu_object_find(env, md2lu_dev(md)->ld_site, f, BYPASS_CAPA);
441 m = (struct md_object *)o;
443 o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
444 m = o ? lu2md(o) : NULL;
449 static int __cmm_mode_get(const struct lu_env *env, struct md_device *md,
450 const struct lu_fid *lf, struct md_attr *ma)
452 struct cmm_thread_info *cmi;
453 struct md_object *mo_s = md_object_find(env, md, lf);
454 struct md_attr *tmp_ma;
459 RETURN(PTR_ERR(mo_s));
461 cmi = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
463 tmp_ma = &cmi->cmi_ma;
464 tmp_ma->ma_need = MA_INODE;
466 /* get type from src, can be remote req */
467 rc = mo_attr_get(env, md_object_next(mo_s), tmp_ma);
469 ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode;
470 ma->ma_attr.la_flags = tmp_ma->ma_attr.la_flags;
471 ma->ma_attr.la_valid |= LA_MODE | LA_FLAGS;
473 lu_object_put(env, &mo_s->mo_lu);
477 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
478 struct md_object *mo_pn, const struct lu_fid *lf,
479 const char *s_name, struct md_object *mo_t,
480 const char *t_name, struct md_attr *ma)
485 rc = __cmm_mode_get(env, md_obj2dev(mo_po), lf, ma);
489 if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
490 /* mo_t is remote object and there is RPC to unlink it */
491 rc = mo_ref_del(env, md_object_next(mo_t), ma);
497 /* local rename, mo_t can be NULL */
498 rc = mdo_rename(env, md_object_next(mo_po),
499 md_object_next(mo_pn), lf, s_name,
500 md_object_next(mo_t), t_name, ma);
504 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
505 struct md_object *mo_t, const struct lu_fid *lf,
506 const char *name, struct md_attr *ma)
511 rc = mdo_rename_tgt(env, md_object_next(mo_p),
512 md_object_next(mo_t), lf, name, ma);
515 /* used only in case of rename_tgt() when target is not exist */
516 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
517 const char *name, const struct lu_fid *lf, int isdir)
522 rc = mdo_name_insert(env, md_object_next(p), name, lf, isdir);
527 /* Common method for remote and local use. */
528 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
529 const struct lu_fid *fid, struct lu_fid *sfid)
531 struct cmm_thread_info *cmi;
535 cmi = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
536 rc = __cmm_mode_get(env, md_obj2dev(mo), fid, &cmi->cmi_ma);
540 if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode))
543 rc = mdo_is_subdir(env, md_object_next(mo), fid, sfid);
547 static struct md_dir_operations cml_dir_ops = {
548 .mdo_is_subdir = cmm_is_subdir,
549 .mdo_lookup = cml_lookup,
550 .mdo_create = cml_create,
551 .mdo_link = cml_link,
552 .mdo_unlink = cml_unlink,
553 .mdo_name_insert = cml_name_insert,
554 .mdo_rename = cml_rename,
555 .mdo_rename_tgt = cml_rename_tgt,
556 .mdo_create_data = cml_create_data
559 /* -------------------------------------------------------------------
560 * remote CMM object operations. cmr_...
562 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
564 return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
566 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
568 return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
570 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
572 return container_of0(co, struct cmr_object, cmm_obj);
575 /* get proper child device from MDCs */
576 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
578 struct lu_device *next = NULL;
579 struct mdc_device *mdc;
581 spin_lock(&d->cmm_tgt_guard);
582 list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
583 if (mdc->mc_num == num) {
584 next = mdc2lu_dev(mdc);
588 spin_unlock(&d->cmm_tgt_guard);
592 /* lu_object operations */
593 static void cmr_object_free(const struct lu_env *env,
594 struct lu_object *lo)
596 struct cmr_object *cro = lu2cmr_obj(lo);
601 static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
603 struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
604 struct lu_device *c_dev;
605 struct lu_object *c_obj;
610 c_dev = cmr_child_dev(cd, lu2cmr_obj(lo)->cmo_num);
614 c_obj = c_dev->ld_ops->ldo_object_alloc(env,
615 lo->lo_header, c_dev);
617 lu_object_add(lo, c_obj);
627 static int cmr_object_print(const struct lu_env *env, void *cookie,
628 lu_printer_t p, const struct lu_object *lo)
630 return (*p)(env, cookie, LUSTRE_CMM_NAME"-remote@%p", lo);
633 static struct lu_object_operations cmr_obj_ops = {
634 .loo_object_init = cmr_object_init,
635 .loo_object_free = cmr_object_free,
636 .loo_object_print = cmr_object_print
639 /* CMM remote md_object operations. All are invalid */
640 static int cmr_object_create(const struct lu_env *env,
641 struct md_object *mo,
642 const struct md_create_spec *spec,
648 static int cmr_permission(const struct lu_env *env, struct md_object *mo,
654 static int cmr_attr_get(const struct lu_env *env, struct md_object *mo,
655 struct md_attr *attr)
660 static int cmr_attr_set(const struct lu_env *env, struct md_object *mo,
661 const struct md_attr *attr)
666 static int cmr_xattr_get(const struct lu_env *env, struct md_object *mo,
667 struct lu_buf *buf, const char *name)
672 static int cmr_readlink(const struct lu_env *env, struct md_object *mo,
678 static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
684 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
685 const struct lu_buf *buf, const char *name, int fl)
690 static int cmr_xattr_del(const struct lu_env *env, struct md_object *mo,
696 static int cmr_ref_add(const struct lu_env *env, struct md_object *mo)
701 static int cmr_ref_del(const struct lu_env *env, struct md_object *mo,
707 static int cmr_open(const struct lu_env *env, struct md_object *mo,
713 static int cmr_close(const struct lu_env *env, struct md_object *mo,
719 static int cmr_readpage(const struct lu_env *env, struct md_object *mo,
720 const struct lu_rdpg *rdpg)
725 static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
726 struct lustre_capa *capa)
731 static struct md_object_operations cmr_mo_ops = {
732 .moo_permission = cmr_permission,
733 .moo_attr_get = cmr_attr_get,
734 .moo_attr_set = cmr_attr_set,
735 .moo_xattr_get = cmr_xattr_get,
736 .moo_xattr_set = cmr_xattr_set,
737 .moo_xattr_list = cmr_xattr_list,
738 .moo_xattr_del = cmr_xattr_del,
739 .moo_object_create = cmr_object_create,
740 .moo_ref_add = cmr_ref_add,
741 .moo_ref_del = cmr_ref_del,
742 .moo_open = cmr_open,
743 .moo_close = cmr_close,
744 .moo_readpage = cmr_readpage,
745 .moo_readlink = cmr_readlink,
746 .moo_capa_get = cmr_capa_get
749 /* remote part of md_dir operations */
750 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
751 const char *name, struct lu_fid *lf)
754 * This can happens while rename() If new parent is remote dir, lookup
762 * All methods below are cross-ref by nature. They consist of remote call and
763 * local operation. Due to future rollback functionality there are several
764 * limitations for such methods:
765 * 1) remote call should be done at first to do epoch negotiation between all
766 * MDS involved and to avoid the RPC inside transaction.
767 * 2) only one RPC can be sent - also due to epoch negotiation.
768 * For more details see rollback HLD/DLD.
771 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
772 const char *child_name, struct md_object *mo_c,
773 const struct md_create_spec *spec,
776 struct cmm_thread_info *cmi;
777 struct md_attr *tmp_ma;
781 /* check the SGID attr */
782 cmi = lu_context_key_get(&env->le_ctx, &cmm_thread_key);
784 tmp_ma = &cmi->cmi_ma;
785 tmp_ma->ma_need = MA_INODE;
786 rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
790 if (tmp_ma->ma_attr.la_mode & S_ISGID) {
791 ma->ma_attr.la_gid = tmp_ma->ma_attr.la_gid;
792 if (S_ISDIR(ma->ma_attr.la_mode)) {
793 ma->ma_attr.la_mode |= S_ISGID;
794 ma->ma_attr.la_valid |= LA_MODE;
797 /* remote object creation and local name insert */
798 rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
800 rc = mdo_name_insert(env, md_object_next(mo_p),
801 child_name, lu_object_fid(&mo_c->mo_lu),
802 S_ISDIR(ma->ma_attr.la_mode));
808 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
809 struct md_object *mo_s, const char *name,
815 //XXX: make sure that MDT checks name isn't exist
817 rc = mo_ref_add(env, md_object_next(mo_s));
819 rc = mdo_name_insert(env, md_object_next(mo_p),
820 name, lu_object_fid(&mo_s->mo_lu), 0);
826 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
827 struct md_object *mo_c, const char *name,
833 rc = mo_ref_del(env, md_object_next(mo_c), ma);
835 rc = mdo_name_remove(env, md_object_next(mo_p), name);
841 static int cmr_rename(const struct lu_env *env,
842 struct md_object *mo_po, struct md_object *mo_pn,
843 const struct lu_fid *lf, const char *s_name,
844 struct md_object *mo_t, const char *t_name,
850 /* get real type of src */
851 rc = __cmm_mode_get(env, md_obj2dev(mo_po), lf, ma);
855 LASSERT(mo_t == NULL);
856 /* the mo_pn is remote directory, so we cannot even know if there is
857 * mo_t or not. Therefore mo_t is NULL here but remote server should do
858 * lookup and process this further */
859 rc = mdo_rename_tgt(env, md_object_next(mo_pn),
860 NULL/* mo_t */, lf, t_name, ma);
861 /* only old name is removed localy */
863 rc = mdo_name_remove(env, md_object_next(mo_po),
869 /* part of cross-ref rename(). Used to insert new name in new parent
870 * and unlink target with same name if it exists */
871 static int cmr_rename_tgt(const struct lu_env *env,
872 struct md_object *mo_p, struct md_object *mo_t,
873 const struct lu_fid *lf, const char *name,
878 /* target object is remote one */
879 rc = mo_ref_del(env, md_object_next(mo_t), ma);
880 /* continue locally with name handling only */
882 rc = mdo_rename_tgt(env, md_object_next(mo_p),
887 static struct md_dir_operations cmr_dir_ops = {
888 .mdo_is_subdir = cmm_is_subdir,
889 .mdo_lookup = cmr_lookup,
890 .mdo_create = cmr_create,
891 .mdo_link = cmr_link,
892 .mdo_unlink = cmr_unlink,
893 .mdo_rename = cmr_rename,
894 .mdo_rename_tgt = cmr_rename_tgt,