1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
46 #include "mdd_internal.h"
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52 const char *name, struct lu_fid* fid, int mask);
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55 const char *name, struct lu_fid* fid, int mask)
57 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58 struct dynlock_handle *dlh;
61 dlh = mdd_pdo_read_lock(env, mdd_obj, name);
64 rc = __mdd_lookup(env, pobj, name, fid, mask);
65 mdd_pdo_read_unlock(env, mdd_obj, dlh);
70 static int mdd_lookup(const struct lu_env *env,
71 struct md_object *pobj, const char *name,
72 struct lu_fid* fid, struct md_op_spec *spec)
76 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
84 return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
88 * For root fid use special function, whcih does not compare version component
89 * of fid. Vresion component is different for root fids on all MDTs.
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
93 return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
98 * return 1: if lf is the fid of the ancestor of p1;
101 * return -EREMOTE: if remote object is found, in this
102 * case fid of remote object is saved to @pf;
104 * otherwise: values < 0, errors.
106 static int mdd_is_parent(const struct lu_env *env,
107 struct mdd_device *mdd,
108 struct mdd_object *p1,
109 const struct lu_fid *lf,
112 struct mdd_object *parent = NULL;
117 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118 pfid = &mdd_env_info(env)->mti_fid;
120 /* Check for root first. */
121 if (mdd_is_root(mdd, mdo2fid(p1)))
125 rc = mdd_parent_fid(env, p1, pfid);
128 if (mdd_is_root(mdd, pfid))
130 if (lu_fid_eq(pfid, lf))
133 mdd_object_put(env, parent);
134 parent = mdd_object_find(env, mdd, pfid);
136 /* cross-ref parent */
137 if (parent == NULL) {
140 GOTO(out, rc = -EREMOTE);
141 } else if (IS_ERR(parent))
142 GOTO(out, rc = PTR_ERR(parent));
147 if (parent && !IS_ERR(parent))
148 mdd_object_put(env, parent);
153 * No permission check is needed.
155 * returns 1: if fid is ancestor of @mo;
156 * returns 0: if fid is not a ancestor of @mo;
158 * returns EREMOTE if remote object is found, fid of remote object is saved to
161 * returns < 0: if error
163 static int mdd_is_subdir(const struct lu_env *env,
164 struct md_object *mo, const struct lu_fid *fid,
167 struct mdd_device *mdd = mdo2mdd(mo);
171 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
174 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
178 } else if (rc == 1) {
179 /* found @fid is parent */
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188 struct mdd_object *cobj, int need_check)
193 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
196 if (mdd_is_dead_obj(pobj))
200 rc = mdd_permission_internal_locked(env, pobj, NULL,
201 MAY_WRITE | MAY_EXEC);
206 static inline int mdd_is_sticky(const struct lu_env *env,
207 struct mdd_object *pobj,
208 struct mdd_object *cobj)
210 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
211 struct md_ucred *uc = md_ucred(env);
214 rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
217 } else if (tmp_la->la_uid == uc->mu_fsuid) {
220 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
223 else if (!(tmp_la->la_mode & S_ISVTX) ||
224 (tmp_la->la_uid == uc->mu_fsuid))
227 return !mdd_capable(uc, CAP_FOWNER);
231 /* Check whether it may delete the cobj under the pobj. */
232 static int mdd_may_delete(const struct lu_env *env,
233 struct mdd_object *pobj,
234 struct mdd_object *cobj,
235 int is_dir, int need_check)
237 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
243 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
246 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
250 if (!S_ISDIR(mdd_object_type(cobj)))
253 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
256 } else if (S_ISDIR(mdd_object_type(cobj))) {
261 if (mdd_is_dead_obj(pobj))
264 if (mdd_is_sticky(env, pobj, cobj))
268 rc = mdd_permission_internal_locked(env, pobj, NULL,
269 MAY_WRITE | MAY_EXEC);
274 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
275 struct mdd_object *src_obj)
280 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
283 if (S_ISDIR(mdd_object_type(src_obj)))
286 LASSERT(src_obj != tgt_obj);
288 rc = mdd_may_create(env, tgt_obj, NULL, 1);
296 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
297 const struct lu_fid *fid)
299 struct mdd_thread_info *info = mdd_env_info(env);
301 fid_cpu_to_be(&info->mti_fid2, fid);
302 return (const struct dt_rec *)&info->mti_fid2;
306 /* insert new index, add reference if isdir, update times */
307 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
308 const struct lu_fid *lf, const char *name, int is_dir,
309 struct thandle *handle, struct lustre_capa *capa)
311 struct dt_object *next = mdd_object_child(pobj);
312 struct timeval start;
316 mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
317 LPROC_MDD_INDEX_INSERT);
318 if (dt_try_as_dir(env, next)) {
319 rc = next->do_index_ops->dio_insert(env, next,
320 __mdd_fid_rec(env, lf),
321 (const struct dt_key *)name,
329 mdd_write_lock(env, pobj);
330 mdd_ref_add_internal(env, pobj, handle);
331 mdd_write_unlock(env, pobj);
334 mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
335 LPROC_MDD_INDEX_INSERT);
339 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
340 const char *name, int is_dir, struct thandle *handle,
341 struct lustre_capa *capa)
343 struct dt_object *next = mdd_object_child(pobj);
344 struct timeval start;
348 mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
349 LPROC_MDD_INDEX_DELETE);
351 if (dt_try_as_dir(env, next)) {
352 rc = next->do_index_ops->dio_delete(env, next,
353 (struct dt_key *)name,
355 if (rc == 0 && is_dir) {
356 mdd_write_lock(env, pobj);
357 mdd_ref_del_internal(env, pobj, handle);
358 mdd_write_unlock(env, pobj);
363 mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
364 LPROC_MDD_INDEX_DELETE);
369 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
370 const struct lu_fid *lf, const char *name,
371 struct thandle *handle, struct lustre_capa *capa)
373 struct dt_object *next = mdd_object_child(pobj);
377 if (dt_try_as_dir(env, next)) {
378 rc = next->do_index_ops->dio_insert(env, next,
379 __mdd_fid_rec(env, lf),
380 (const struct dt_key *)name,
388 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
389 struct md_object *src_obj, const char *name,
392 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
393 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
394 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
395 struct mdd_device *mdd = mdo2mdd(src_obj);
396 struct dynlock_handle *dlh;
397 struct thandle *handle;
401 mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
402 handle = mdd_trans_start(env, mdd);
404 RETURN(PTR_ERR(handle));
406 dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
408 GOTO(out_trans, rc = -ENOMEM);
409 mdd_write_lock(env, mdd_sobj);
411 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
413 GOTO(out_unlock, rc);
415 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
417 mdd_object_capa(env, mdd_tobj));
419 GOTO(out_unlock, rc);
421 mdd_ref_add_internal(env, mdd_sobj, handle);
424 la->la_valid = LA_CTIME | LA_MTIME;
425 rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
427 GOTO(out_unlock, rc);
429 la->la_valid = LA_CTIME;
430 rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
433 mdd_write_unlock(env, mdd_sobj);
434 mdd_pdo_write_unlock(env, mdd_tobj, dlh);
436 mdd_trans_stop(env, mdd, rc, handle);
440 /* caller should take a lock before calling */
441 int mdd_finish_unlink(const struct lu_env *env,
442 struct mdd_object *obj, struct md_attr *ma,
448 rc = mdd_iattr_get(env, obj, ma);
449 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
450 /* add new orphan and the object
451 * will be deleted during the object_put() */
452 if (__mdd_orphan_add(env, obj, th) == 0)
453 obj->mod_flags |= ORPHAN_OBJ;
455 if (obj->mod_count == 0)
456 rc = mdd_object_kill(env, obj, ma);
458 /* clear MA_LOV | MA_COOKIE, if we do not
459 * unlink it in case we get it somewhere */
460 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
462 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
468 * Check that @dir contains no entries except (possibly) dot and dotdot.
473 * -ENOTEMPTY not empty
477 static int mdd_dir_is_empty(const struct lu_env *env,
478 struct mdd_object *dir)
481 struct dt_object *obj;
482 struct dt_it_ops *iops;
486 obj = mdd_object_child(dir);
487 iops = &obj->do_index_ops->dio_it;
488 it = iops->init(env, obj, 0, BYPASS_CAPA);
490 result = iops->get(env, it, (const void *)"");
493 for (result = 0, i = 0; result == 0 && i < 3; ++i)
494 result = iops->next(env, it);
497 else if (result == +1)
499 } else if (result == 0)
501 * Huh? Index contains no zero key?
512 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
513 struct mdd_object *cobj, struct md_attr *ma)
515 struct dt_object *dt_cobj = mdd_object_child(cobj);
519 rc = mdd_may_delete(env, pobj, cobj,
520 S_ISDIR(ma->ma_attr.la_mode), 1);
524 if (S_ISDIR(mdd_object_type(cobj))) {
525 if (dt_try_as_dir(env, dt_cobj))
526 rc = mdd_dir_is_empty(env, cobj);
534 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
535 struct md_object *cobj, const char *name,
538 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
539 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
540 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
541 struct mdd_device *mdd = mdo2mdd(pobj);
542 struct dynlock_handle *dlh;
543 struct thandle *handle;
547 rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
551 handle = mdd_trans_start(env, mdd);
553 RETURN(PTR_ERR(handle));
555 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
557 GOTO(out_trans, rc = -ENOMEM);
558 mdd_write_lock(env, mdd_cobj);
560 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
564 is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
565 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
566 mdd_object_capa(env, mdd_pobj));
570 mdd_ref_del_internal(env, mdd_cobj, handle);
573 mdd_ref_del_internal(env, mdd_cobj, handle);
577 la->la_valid = LA_CTIME | LA_MTIME;
578 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
582 la->la_valid = LA_CTIME;
583 rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
587 rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
590 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
591 strlen("unlinked"), "unlinked", 0,
595 mdd_write_unlock(env, mdd_cobj);
596 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
598 mdd_trans_stop(env, mdd, rc, handle);
602 static int mdd_ni_sanity_check(const struct lu_env *env,
603 struct md_object *pobj,
605 const struct lu_fid *fid)
607 struct mdd_object *obj = md2mdd_obj(pobj);
611 if (mdd_is_dead_obj(obj))
614 /* The exist of the name will be checked in _index_insert. */
615 RETURN(mdd_permission_internal_locked(env, obj, NULL,
616 MAY_WRITE | MAY_EXEC));
622 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
623 const char *name, const struct lu_fid *fid,
626 struct lu_attr *la = &mdd_env_info(env)->mti_la;
627 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
628 struct mdd_device *mdd = mdo2mdd(pobj);
629 struct dynlock_handle *dlh;
630 struct thandle *handle;
634 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
635 handle = mdd_trans_start(env, mdo2mdd(pobj));
637 RETURN(PTR_ERR(handle));
639 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
641 GOTO(out_trans, rc = -ENOMEM);
642 rc = mdd_ni_sanity_check(env, pobj, name, fid);
644 GOTO(out_unlock, rc);
646 rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
647 handle, BYPASS_CAPA);
649 la->la_ctime = la->la_atime = CURRENT_SECONDS;
650 la->la_valid = LA_ATIME | LA_CTIME;
651 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
654 mdd_pdo_write_unlock(env, mdd_obj, dlh);
656 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
660 static int mdd_nr_sanity_check(const struct lu_env *env,
661 struct md_object *pobj,
664 struct mdd_object *obj = md2mdd_obj(pobj);
668 if (mdd_is_dead_obj(obj)) {
669 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
673 /* Name presense will be checked in _index_delete. */
674 RETURN(mdd_permission_internal_locked(env, obj, NULL,
675 MAY_WRITE | MAY_EXEC));
681 static int mdd_name_remove(const struct lu_env *env,
682 struct md_object *pobj,
683 const char *name, int is_dir)
685 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
686 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
687 struct mdd_device *mdd = mdo2mdd(pobj);
688 struct dynlock_handle *dlh;
689 struct thandle *handle;
693 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
694 handle = mdd_trans_start(env, mdd);
696 RETURN(PTR_ERR(handle));
698 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
700 GOTO(out_trans, rc = -ENOMEM);
701 rc = mdd_nr_sanity_check(env, pobj, name);
703 GOTO(out_unlock, rc);
705 rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
706 handle, BYPASS_CAPA);
708 GOTO(out_unlock, rc);
710 la->la_ctime = la->la_mtime = CURRENT_SECONDS;
711 la->la_valid = LA_CTIME | LA_MTIME;
712 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
715 mdd_pdo_write_unlock(env, mdd_obj, dlh);
717 mdd_trans_stop(env, mdd, rc, handle);
721 static int mdd_rt_sanity_check(const struct lu_env *env,
722 struct mdd_object *tgt_pobj,
723 struct mdd_object *tobj,
724 const struct lu_fid *sfid,
725 const char *name, struct md_attr *ma)
731 if (mdd_is_dead_obj(tgt_pobj))
734 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
736 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
737 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
738 mdd_dir_is_empty(env, tobj))
741 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
747 static int mdd_rename_tgt(const struct lu_env *env,
748 struct md_object *pobj, struct md_object *tobj,
749 const struct lu_fid *lf, const char *name,
752 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
753 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
754 struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
755 struct mdd_device *mdd = mdo2mdd(pobj);
756 struct dynlock_handle *dlh;
757 struct thandle *handle;
761 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
762 handle = mdd_trans_start(env, mdd);
764 RETURN(PTR_ERR(handle));
766 dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
768 GOTO(out_trans, rc = -ENOMEM);
770 mdd_write_lock(env, mdd_tobj);
772 /* XXX: Rename sanity checking. */
773 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
778 * If rename_tgt is called then we should just re-insert name with
779 * correct fid, no need to dec/inc parent nlink if obj is dir.
781 rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
785 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
791 la->la_valid = LA_CTIME | LA_MTIME;
792 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
796 if (tobj && lu_object_exists(&tobj->mo_lu)) {
797 mdd_ref_del_internal(env, mdd_tobj, handle);
798 la->la_valid = LA_CTIME;
799 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
804 mdd_write_unlock(env, mdd_tobj);
805 mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
807 mdd_trans_stop(env, mdd, rc, handle);
812 * The permission has been checked when obj created, no need check again.
814 static int mdd_cd_sanity_check(const struct lu_env *env,
815 struct mdd_object *obj)
820 if (!obj || mdd_is_dead_obj(obj))
827 static int mdd_create_data(const struct lu_env *env,
828 struct md_object *pobj, struct md_object *cobj,
829 const struct md_op_spec *spec,
832 struct mdd_device *mdd = mdo2mdd(cobj);
833 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
834 struct mdd_object *son = md2mdd_obj(cobj);
835 struct lu_attr *attr = &ma->ma_attr;
836 struct lov_mds_md *lmm = NULL;
838 struct thandle *handle;
842 rc = mdd_cd_sanity_check(env, son);
846 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
847 !(spec->sp_cr_flags & FMODE_WRITE))
849 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
854 mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
855 handle = mdd_trans_start(env, mdd);
857 RETURN(rc = PTR_ERR(handle));
860 * XXX: Setting the lov ea is not locked but setting the attr is locked?
861 * Should this be fixed?
864 /* Replay creates has objects already */
865 if (spec->u.sp_ea.no_lov_create) {
866 CDEBUG(D_INFO, "we already have lov ea\n");
867 rc = mdd_lov_set_md(env, mdd_pobj, son,
868 (struct lov_mds_md *)spec->u.sp_ea.eadata,
869 spec->u.sp_ea.eadatalen, handle, 0);
871 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
872 lmm_size, handle, 0);
875 rc = mdd_attr_get_internal_locked(env, son, ma);
877 /* Finish mdd_lov_create() stuff. */
878 mdd_lov_create_finish(env, mdd, rc);
879 mdd_trans_stop(env, mdd, rc, handle);
881 OBD_FREE(lmm, lmm_size);
886 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
887 const char *name, struct lu_fid* fid, int mask)
889 const struct dt_key *key = (const struct dt_key *)name;
890 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
891 struct dt_object *dir = mdd_object_child(mdd_obj);
892 struct dt_rec *rec = (struct dt_rec *)fid;
893 struct timeval start;
897 mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
898 if (mdd_is_dead_obj(mdd_obj))
901 rc = lu_object_exists(mdd2lu_obj(mdd_obj));
905 CERROR("Object "DFID" locates on remote server\n",
906 PFID(mdo2fid(mdd_obj)));
910 rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
914 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
915 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
916 mdd_object_capa(env, mdd_obj));
918 fid_be_to_cpu(fid, fid);
922 mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
926 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
927 struct mdd_object *child, struct md_attr *ma,
928 struct thandle *handle)
934 * Update attributes for child.
937 * (1) the valid bits should be converted between Lustre and Linux;
938 * (2) maybe, the child attributes should be set in OSD when creation.
941 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
945 if (S_ISDIR(ma->ma_attr.la_mode)) {
946 /* Add "." and ".." for newly created dir */
947 mdd_ref_add_internal(env, child, handle);
948 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
949 dot, handle, BYPASS_CAPA);
951 rc = __mdd_index_insert_only(env, child, pfid,
957 rc2 = __mdd_index_delete(env, child, dot, 0,
958 handle, BYPASS_CAPA);
960 CERROR("Failure to cleanup after dotdot"
961 " creation: %d (%d)\n", rc2, rc);
963 mdd_ref_del_internal(env, child, handle);
970 static int mdd_create_sanity_check(const struct lu_env *env,
971 struct md_object *pobj,
976 struct mdd_thread_info *info = mdd_env_info(env);
977 struct lu_attr *la = &info->mti_la;
978 struct lu_fid *fid = &info->mti_fid;
979 struct mdd_object *obj = md2mdd_obj(pobj);
984 if (mdd_is_dead_obj(obj))
988 * In some cases this lookup is not needed - we know before that if name
993 * Check if the name already exist, though it will be checked in
994 * _index_insert also, for avoiding rolling back if exists
997 rc = __mdd_lookup_locked(env, pobj, name, fid,
998 MAY_WRITE | MAY_EXEC);
1000 RETURN(rc ? : -EEXIST);
1003 * Check if has WRITE permission for the parent.
1005 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1011 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1015 if (la->la_mode & S_ISGID) {
1016 ma->ma_attr.la_gid = la->la_gid;
1017 if (S_ISDIR(ma->ma_attr.la_mode)) {
1018 ma->ma_attr.la_mode |= S_ISGID;
1019 ma->ma_attr.la_valid |= LA_MODE;
1023 switch (ma->ma_attr.la_mode & S_IFMT) {
1041 * Create object and insert it into namespace.
1043 static int mdd_create(const struct lu_env *env,
1044 struct md_object *pobj, const char *name,
1045 struct md_object *child,
1046 struct md_op_spec *spec,
1049 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1050 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1051 struct mdd_object *son = md2mdd_obj(child);
1052 struct mdd_device *mdd = mdo2mdd(pobj);
1053 struct lu_attr *attr = &ma->ma_attr;
1054 struct lov_mds_md *lmm = NULL;
1055 struct thandle *handle;
1056 int rc, created = 0, inserted = 0, lmm_size = 0;
1057 struct dynlock_handle *dlh;
1058 struct timeval start;
1061 mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
1064 * Two operations have to be performed:
1066 * - allocation of new object (->do_create()), and
1068 * - insertion into parent index (->dio_insert()).
1070 * Due to locking, operation order is not important, when both are
1071 * successful, *but* error handling cases are quite different:
1073 * - if insertion is done first, and following object creation fails,
1074 * insertion has to be rolled back, but this operation might fail
1075 * also leaving us with dangling index entry.
1077 * - if creation is done first, is has to be undone if insertion
1078 * fails, leaving us with leaked space, which is neither good, nor
1081 * It seems that creation-first is simplest solution, but it is
1082 * sub-optimal in the frequent
1087 * case, because second mkdir is bound to create object, only to
1088 * destroy it immediately.
1090 * To avoid this follow local file systems that do double lookup:
1092 * 0. lookup -> -EEXIST (mdd_create_sanity_check())
1094 * 1. create (mdd_object_create_internal())
1096 * 2. insert (__mdd_index_insert(), lookup again)
1099 /* Sanity checks before big job. */
1100 rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1105 * No RPC inside the transaction, so OST objects should be created at
1108 if (S_ISREG(attr->la_mode)) {
1109 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1115 mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1116 handle = mdd_trans_start(env, mdd);
1118 GOTO(out_free, rc = PTR_ERR(handle));
1120 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1122 GOTO(out_trans, rc = -ENOMEM);
1125 * XXX: Check that link can be added to the parent in mkdir case.
1128 mdd_write_lock(env, son);
1129 rc = mdd_object_create_internal(env, son, ma, handle);
1131 mdd_write_unlock(env, son);
1137 #ifdef CONFIG_FS_POSIX_ACL
1138 mdd_read_lock(env, mdd_pobj);
1139 rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1140 mdd_read_unlock(env, mdd_pobj);
1142 mdd_write_unlock(env, son);
1145 ma->ma_attr.la_valid |= LA_MODE;
1149 rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1151 mdd_write_unlock(env, son);
1154 * Object has no links, so it will be destroyed when last
1155 * reference is released. (XXX not now.)
1159 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1160 name, S_ISDIR(attr->la_mode), handle,
1161 mdd_object_capa(env, mdd_pobj));
1168 /* Replay creates has objects already. */
1169 if (spec->u.sp_ea.no_lov_create) {
1170 CDEBUG(D_INFO, "we already have lov ea\n");
1171 LASSERT(lmm == NULL);
1172 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1173 lmm_size = spec->u.sp_ea.eadatalen;
1175 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1177 CERROR("error on stripe info copy %d \n", rc);
1180 if (lmm && lmm_size > 0) {
1181 /* Set Lov here, do not get lmm again later */
1182 memcpy(ma->ma_lmm, lmm, lmm_size);
1183 ma->ma_lmm_size = lmm_size;
1184 ma->ma_valid |= MA_LOV;
1187 if (S_ISLNK(attr->la_mode)) {
1188 struct dt_object *dt = mdd_object_child(son);
1189 const char *target_name = spec->u.sp_symname;
1190 int sym_len = strlen(target_name);
1191 const struct lu_buf *buf;
1194 buf = mdd_buf_get_const(env, target_name, sym_len);
1195 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1196 mdd_object_capa(env, son));
1201 GOTO(cleanup, rc = -EFAULT);
1205 la->la_valid = LA_CTIME | LA_MTIME;
1206 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1210 /* Return attr back. */
1211 rc = mdd_attr_get_internal_locked(env, son, ma);
1214 if (rc && created) {
1218 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1219 S_ISDIR(attr->la_mode),
1220 handle, BYPASS_CAPA);
1222 CERROR("error can not cleanup destroy %d\n",
1226 mdd_write_lock(env, son);
1227 mdd_ref_del_internal(env, son, handle);
1228 mdd_write_unlock(env, son);
1232 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1234 mdd_trans_stop(env, mdd, rc, handle);
1236 if (lmm && !spec->u.sp_ea.no_lov_create)
1237 OBD_FREE(lmm, lmm_size);
1238 /* Finish mdd_lov_create() stuff */
1239 mdd_lov_create_finish(env, mdd, rc);
1240 mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
1245 * Get locks on parents in proper order
1246 * RETURN: < 0 - error, rename_order if successful
1254 static int mdd_rename_order(const struct lu_env *env,
1255 struct mdd_device *mdd,
1256 struct mdd_object *src_pobj,
1257 struct mdd_object *tgt_pobj)
1259 /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1263 if (src_pobj == tgt_pobj)
1264 RETURN(MDD_RN_SAME);
1266 /* compared the parent child relationship of src_p&tgt_p */
1267 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1269 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1272 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1285 static int mdd_rename_sanity_check(const struct lu_env *env,
1286 struct mdd_object *src_pobj,
1287 struct mdd_object *tgt_pobj,
1288 const struct lu_fid *sfid,
1290 struct mdd_object *tobj)
1295 if (mdd_is_dead_obj(src_pobj))
1298 /* The sobj maybe on the remote, check parent permission only here */
1299 rc = mdd_permission_internal_locked(env, src_pobj, NULL,
1300 MAY_WRITE | MAY_EXEC);
1305 rc = mdd_may_create(env, tgt_pobj, NULL,
1306 (src_pobj != tgt_pobj));
1308 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1309 (src_pobj != tgt_pobj));
1311 if (S_ISDIR(mdd_object_type(tobj))
1312 && mdd_dir_is_empty(env, tobj))
1318 /* src object can be remote that is why we use only fid and type of object */
1319 static int mdd_rename(const struct lu_env *env,
1320 struct md_object *src_pobj, struct md_object *tgt_pobj,
1321 const struct lu_fid *lf, const char *sname,
1322 struct md_object *tobj, const char *tname,
1325 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1326 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1327 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1328 struct mdd_device *mdd = mdo2mdd(src_pobj);
1329 struct mdd_object *mdd_sobj = NULL;
1330 struct mdd_object *mdd_tobj = NULL;
1331 struct dynlock_handle *sdlh, *tdlh;
1332 struct thandle *handle;
1337 LASSERT(ma->ma_attr.la_mode & S_IFMT);
1338 is_dir = S_ISDIR(ma->ma_attr.la_mode);
1339 if (ma->ma_attr.la_valid & LA_FLAGS &&
1340 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1344 mdd_tobj = md2mdd_obj(tobj);
1346 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1347 handle = mdd_trans_start(env, mdd);
1349 RETURN(PTR_ERR(handle));
1351 /* FIXME: Should consider tobj and sobj too in rename_lock. */
1352 rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1354 GOTO(cleanup_unlocked, rc);
1356 /* Get locks in determined order */
1357 if (rc == MDD_RN_SAME) {
1358 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1359 /* check hashes to determine do we need one lock or two */
1360 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1361 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1364 } else if (rc == MDD_RN_SRCTGT) {
1365 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1366 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1368 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1369 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1371 if (sdlh == NULL || tdlh == NULL)
1372 GOTO(cleanup, rc = -ENOMEM);
1374 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1375 lf, is_dir, mdd_tobj);
1379 rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1380 mdd_object_capa(env, mdd_spobj));
1385 * Here tobj can be remote one, so we do index_delete unconditionally
1386 * and -ENOENT is allowed.
1388 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1389 mdd_object_capa(env, mdd_tpobj));
1390 if (rc != 0 && rc != -ENOENT)
1393 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1394 mdd_object_capa(env, mdd_tpobj));
1399 mdd_sobj = mdd_object_find(env, mdd, lf);
1401 la->la_valid = LA_CTIME;
1403 /* XXX: How to update ctime for remote sobj? */
1404 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1408 if (tobj && lu_object_exists(&tobj->mo_lu)) {
1409 mdd_write_lock(env, mdd_tobj);
1410 mdd_ref_del_internal(env, mdd_tobj, handle);
1412 /* Remove dot reference. */
1414 mdd_ref_del_internal(env, mdd_tobj, handle);
1416 la->la_valid = LA_CTIME;
1417 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1421 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1422 mdd_write_unlock(env, mdd_tobj);
1427 la->la_valid = LA_CTIME | LA_MTIME;
1428 rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1432 if (mdd_spobj != mdd_tpobj) {
1433 la->la_valid = LA_CTIME | LA_MTIME;
1434 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1440 if (likely(tdlh) && sdlh != tdlh)
1441 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1443 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1445 mdd_trans_stop(env, mdd, rc, handle);
1447 mdd_object_put(env, mdd_sobj);
1451 struct md_dir_operations mdd_dir_ops = {
1452 .mdo_is_subdir = mdd_is_subdir,
1453 .mdo_lookup = mdd_lookup,
1454 .mdo_create = mdd_create,
1455 .mdo_rename = mdd_rename,
1456 .mdo_link = mdd_link,
1457 .mdo_unlink = mdd_unlink,
1458 .mdo_name_insert = mdd_name_insert,
1459 .mdo_name_remove = mdd_name_remove,
1460 .mdo_rename_tgt = mdd_rename_tgt,
1461 .mdo_create_data = mdd_create_data