1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
46 #include "mdd_internal.h"
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52 const char *name, struct lu_fid* fid, int mask);
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55 const char *name, struct lu_fid* fid, int mask)
57 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58 struct dynlock_handle *dlh;
61 dlh = mdd_pdo_read_lock(env, mdd_obj, name);
64 rc = __mdd_lookup(env, pobj, name, fid, mask);
65 mdd_pdo_read_unlock(env, mdd_obj, dlh);
70 static int mdd_lookup(const struct lu_env *env,
71 struct md_object *pobj, const char *name,
72 struct lu_fid* fid, struct md_op_spec *spec)
76 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
84 return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
88 * For root fid use special function, whcih does not compare version component
89 * of fid. Vresion component is different for root fids on all MDTs.
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
93 return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
98 * return 1: if lf is the fid of the ancestor of p1;
101 * return -EREMOTE: if remote object is found, in this
102 * case fid of remote object is saved to @pf;
104 * otherwise: values < 0, errors.
106 static int mdd_is_parent(const struct lu_env *env,
107 struct mdd_device *mdd,
108 struct mdd_object *p1,
109 const struct lu_fid *lf,
112 struct mdd_object *parent = NULL;
117 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118 pfid = &mdd_env_info(env)->mti_fid;
120 /* Check for root first. */
121 if (mdd_is_root(mdd, mdo2fid(p1)))
125 rc = mdd_parent_fid(env, p1, pfid);
128 if (mdd_is_root(mdd, pfid))
130 if (lu_fid_eq(pfid, lf))
133 mdd_object_put(env, parent);
134 parent = mdd_object_find(env, mdd, pfid);
136 /* cross-ref parent */
137 if (parent == NULL) {
140 GOTO(out, rc = -EREMOTE);
141 } else if (IS_ERR(parent))
142 GOTO(out, rc = PTR_ERR(parent));
147 if (parent && !IS_ERR(parent))
148 mdd_object_put(env, parent);
153 * No permission check is needed.
155 * returns 1: if fid is ancestor of @mo;
156 * returns 0: if fid is not a ancestor of @mo;
158 * returns EREMOTE if remote object is found, fid of remote object is saved to
161 * returns < 0: if error
163 static int mdd_is_subdir(const struct lu_env *env,
164 struct md_object *mo, const struct lu_fid *fid,
167 struct mdd_device *mdd = mdo2mdd(mo);
171 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
174 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
178 } else if (rc == 1) {
179 /* found @fid is parent */
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188 struct mdd_object *cobj, int need_check)
193 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
196 if (mdd_is_dead_obj(pobj))
200 rc = mdd_permission_internal_locked(env, pobj, NULL,
201 MAY_WRITE | MAY_EXEC);
206 static inline int mdd_is_sticky(const struct lu_env *env,
207 struct mdd_object *pobj,
208 struct mdd_object *cobj)
210 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
211 struct md_ucred *uc = md_ucred(env);
214 rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
217 } else if (tmp_la->la_uid == uc->mu_fsuid) {
220 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
223 else if (!(tmp_la->la_mode & S_ISVTX) ||
224 (tmp_la->la_uid == uc->mu_fsuid))
227 return !mdd_capable(uc, CAP_FOWNER);
231 /* Check whether it may delete the cobj under the pobj. */
232 static int mdd_may_delete(const struct lu_env *env,
233 struct mdd_object *pobj,
234 struct mdd_object *cobj,
235 int is_dir, int need_check)
237 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
243 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
246 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
250 if (!S_ISDIR(mdd_object_type(cobj)))
253 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
256 } else if (S_ISDIR(mdd_object_type(cobj))) {
261 if (mdd_is_dead_obj(pobj))
264 if (mdd_is_sticky(env, pobj, cobj))
268 rc = mdd_permission_internal_locked(env, pobj, NULL,
269 MAY_WRITE | MAY_EXEC);
274 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
275 struct mdd_object *src_obj)
280 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
283 if (S_ISDIR(mdd_object_type(src_obj)))
286 LASSERT(src_obj != tgt_obj);
288 rc = mdd_may_create(env, tgt_obj, NULL, 1);
296 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
297 const struct lu_fid *fid)
299 struct mdd_thread_info *info = mdd_env_info(env);
301 fid_cpu_to_be(&info->mti_fid2, fid);
302 return (const struct dt_rec *)&info->mti_fid2;
306 /* insert new index, add reference if isdir, update times */
307 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
308 const struct lu_fid *lf, const char *name, int is_dir,
309 struct thandle *handle, struct lustre_capa *capa)
311 struct dt_object *next = mdd_object_child(pobj);
312 struct timeval start;
316 mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
317 LPROC_MDD_INDEX_INSERT);
318 if (dt_try_as_dir(env, next)) {
319 rc = next->do_index_ops->dio_insert(env, next,
320 __mdd_fid_rec(env, lf),
321 (const struct dt_key *)name,
329 mdd_write_lock(env, pobj);
330 mdd_ref_add_internal(env, pobj, handle);
331 mdd_write_unlock(env, pobj);
334 mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
335 LPROC_MDD_INDEX_INSERT);
339 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
340 const char *name, int is_dir, struct thandle *handle,
341 struct lustre_capa *capa)
343 struct dt_object *next = mdd_object_child(pobj);
344 struct timeval start;
348 mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
349 LPROC_MDD_INDEX_DELETE);
351 if (dt_try_as_dir(env, next)) {
352 rc = next->do_index_ops->dio_delete(env, next,
353 (struct dt_key *)name,
355 if (rc == 0 && is_dir) {
356 mdd_write_lock(env, pobj);
357 mdd_ref_del_internal(env, pobj, handle);
358 mdd_write_unlock(env, pobj);
363 mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
364 LPROC_MDD_INDEX_DELETE);
369 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
370 const struct lu_fid *lf, const char *name,
371 struct thandle *handle, struct lustre_capa *capa)
373 struct dt_object *next = mdd_object_child(pobj);
377 if (dt_try_as_dir(env, next)) {
378 rc = next->do_index_ops->dio_insert(env, next,
379 __mdd_fid_rec(env, lf),
380 (const struct dt_key *)name,
388 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
389 struct md_object *src_obj, const char *name,
392 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
393 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
394 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
395 struct mdd_device *mdd = mdo2mdd(src_obj);
396 struct dynlock_handle *dlh;
397 struct thandle *handle;
401 mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
402 handle = mdd_trans_start(env, mdd);
404 RETURN(PTR_ERR(handle));
406 dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
408 GOTO(out_trans, rc = -ENOMEM);
409 mdd_write_lock(env, mdd_sobj);
411 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
413 GOTO(out_unlock, rc);
415 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
417 mdd_object_capa(env, mdd_tobj));
419 GOTO(out_unlock, rc);
421 mdd_ref_add_internal(env, mdd_sobj, handle);
424 la->la_valid = LA_CTIME | LA_MTIME;
425 rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
427 GOTO(out_unlock, rc);
429 la->la_valid = LA_CTIME;
430 rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
433 mdd_write_unlock(env, mdd_sobj);
434 mdd_pdo_write_unlock(env, mdd_tobj, dlh);
436 mdd_trans_stop(env, mdd, rc, handle);
440 /* caller should take a lock before calling */
441 int mdd_finish_unlink(const struct lu_env *env,
442 struct mdd_object *obj, struct md_attr *ma,
448 rc = mdd_iattr_get(env, obj, ma);
449 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
450 /* add new orphan and the object
451 * will be deleted during the object_put() */
452 if (__mdd_orphan_add(env, obj, th) == 0)
453 obj->mod_flags |= ORPHAN_OBJ;
455 if (obj->mod_count == 0)
456 rc = mdd_object_kill(env, obj, ma);
458 /* clear MA_LOV | MA_COOKIE, if we do not
459 * unlink it in case we get it somewhere */
460 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
462 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
468 * Check that @dir contains no entries except (possibly) dot and dotdot.
473 * -ENOTEMPTY not empty
477 static int mdd_dir_is_empty(const struct lu_env *env,
478 struct mdd_object *dir)
481 struct dt_object *obj;
482 struct dt_it_ops *iops;
486 obj = mdd_object_child(dir);
487 iops = &obj->do_index_ops->dio_it;
488 it = iops->init(env, obj, 0, BYPASS_CAPA);
490 result = iops->get(env, it, (const void *)"");
493 for (result = 0, i = 0; result == 0 && i < 3; ++i)
494 result = iops->next(env, it);
497 else if (result == +1)
499 } else if (result == 0)
501 * Huh? Index contains no zero key?
512 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
513 struct mdd_object *cobj, struct md_attr *ma)
515 struct dt_object *dt_cobj = mdd_object_child(cobj);
519 rc = mdd_may_delete(env, pobj, cobj,
520 S_ISDIR(ma->ma_attr.la_mode), 1);
524 if (S_ISDIR(mdd_object_type(cobj))) {
525 if (dt_try_as_dir(env, dt_cobj))
526 rc = mdd_dir_is_empty(env, cobj);
534 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
535 struct md_object *cobj, const char *name,
538 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
539 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
540 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
541 struct mdd_device *mdd = mdo2mdd(pobj);
542 struct dynlock_handle *dlh;
543 struct thandle *handle;
548 * Check -ENOENT early here because we need to get object type
549 * to calculate credits before transaction start
551 if (!lu_object_exists(&cobj->mo_lu))
553 LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is"DFID"\n",
554 PFID(lu_object_fid(&cobj->mo_lu)));
556 rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
560 handle = mdd_trans_start(env, mdd);
562 RETURN(PTR_ERR(handle));
564 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
566 GOTO(out_trans, rc = -ENOMEM);
567 mdd_write_lock(env, mdd_cobj);
569 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
573 is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
574 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
575 mdd_object_capa(env, mdd_pobj));
579 mdd_ref_del_internal(env, mdd_cobj, handle);
582 mdd_ref_del_internal(env, mdd_cobj, handle);
586 la->la_valid = LA_CTIME | LA_MTIME;
587 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
591 la->la_valid = LA_CTIME;
592 rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
596 rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
599 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
600 strlen("unlinked"), "unlinked", 0,
604 mdd_write_unlock(env, mdd_cobj);
605 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
607 mdd_trans_stop(env, mdd, rc, handle);
611 static int mdd_ni_sanity_check(const struct lu_env *env,
612 struct md_object *pobj,
614 const struct lu_fid *fid)
616 struct mdd_object *obj = md2mdd_obj(pobj);
620 if (mdd_is_dead_obj(obj))
623 /* The exist of the name will be checked in _index_insert. */
624 RETURN(mdd_permission_internal_locked(env, obj, NULL,
625 MAY_WRITE | MAY_EXEC));
631 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
632 const char *name, const struct lu_fid *fid,
635 struct lu_attr *la = &mdd_env_info(env)->mti_la;
636 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
637 struct mdd_device *mdd = mdo2mdd(pobj);
638 struct dynlock_handle *dlh;
639 struct thandle *handle;
643 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
644 handle = mdd_trans_start(env, mdo2mdd(pobj));
646 RETURN(PTR_ERR(handle));
648 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
650 GOTO(out_trans, rc = -ENOMEM);
651 rc = mdd_ni_sanity_check(env, pobj, name, fid);
653 GOTO(out_unlock, rc);
655 rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
656 handle, BYPASS_CAPA);
658 la->la_ctime = la->la_atime = CURRENT_SECONDS;
659 la->la_valid = LA_ATIME | LA_CTIME;
660 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
663 mdd_pdo_write_unlock(env, mdd_obj, dlh);
665 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
669 static int mdd_nr_sanity_check(const struct lu_env *env,
670 struct md_object *pobj,
673 struct mdd_object *obj = md2mdd_obj(pobj);
677 if (mdd_is_dead_obj(obj)) {
678 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
682 /* Name presense will be checked in _index_delete. */
683 RETURN(mdd_permission_internal_locked(env, obj, NULL,
684 MAY_WRITE | MAY_EXEC));
690 static int mdd_name_remove(const struct lu_env *env,
691 struct md_object *pobj,
692 const char *name, int is_dir)
694 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
695 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
696 struct mdd_device *mdd = mdo2mdd(pobj);
697 struct dynlock_handle *dlh;
698 struct thandle *handle;
702 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
703 handle = mdd_trans_start(env, mdd);
705 RETURN(PTR_ERR(handle));
707 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
709 GOTO(out_trans, rc = -ENOMEM);
710 rc = mdd_nr_sanity_check(env, pobj, name);
712 GOTO(out_unlock, rc);
714 rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
715 handle, BYPASS_CAPA);
717 GOTO(out_unlock, rc);
719 la->la_ctime = la->la_mtime = CURRENT_SECONDS;
720 la->la_valid = LA_CTIME | LA_MTIME;
721 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
724 mdd_pdo_write_unlock(env, mdd_obj, dlh);
726 mdd_trans_stop(env, mdd, rc, handle);
730 static int mdd_rt_sanity_check(const struct lu_env *env,
731 struct mdd_object *tgt_pobj,
732 struct mdd_object *tobj,
733 const struct lu_fid *sfid,
734 const char *name, struct md_attr *ma)
740 if (mdd_is_dead_obj(tgt_pobj))
743 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
745 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
746 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
747 mdd_dir_is_empty(env, tobj))
750 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
756 static int mdd_rename_tgt(const struct lu_env *env,
757 struct md_object *pobj, struct md_object *tobj,
758 const struct lu_fid *lf, const char *name,
761 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
762 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
763 struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
764 struct mdd_device *mdd = mdo2mdd(pobj);
765 struct dynlock_handle *dlh;
766 struct thandle *handle;
770 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
771 handle = mdd_trans_start(env, mdd);
773 RETURN(PTR_ERR(handle));
775 dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
777 GOTO(out_trans, rc = -ENOMEM);
779 mdd_write_lock(env, mdd_tobj);
781 /* XXX: Rename sanity checking. */
782 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
787 * If rename_tgt is called then we should just re-insert name with
788 * correct fid, no need to dec/inc parent nlink if obj is dir.
790 rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
794 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
800 la->la_valid = LA_CTIME | LA_MTIME;
801 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
805 if (tobj && lu_object_exists(&tobj->mo_lu)) {
806 mdd_ref_del_internal(env, mdd_tobj, handle);
807 la->la_valid = LA_CTIME;
808 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
813 mdd_write_unlock(env, mdd_tobj);
814 mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
816 mdd_trans_stop(env, mdd, rc, handle);
821 * The permission has been checked when obj created, no need check again.
823 static int mdd_cd_sanity_check(const struct lu_env *env,
824 struct mdd_object *obj)
829 if (!obj || mdd_is_dead_obj(obj))
836 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
837 struct md_object *cobj, const struct md_op_spec *spec,
840 struct mdd_device *mdd = mdo2mdd(cobj);
841 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
842 struct mdd_object *son = md2mdd_obj(cobj);
843 struct lu_attr *attr = &ma->ma_attr;
844 struct lov_mds_md *lmm = NULL;
846 struct thandle *handle;
850 rc = mdd_cd_sanity_check(env, son);
854 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
855 !(spec->sp_cr_flags & FMODE_WRITE))
858 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
863 mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
864 handle = mdd_trans_start(env, mdd);
866 GOTO(out_free, rc = PTR_ERR(handle));
869 * XXX: Setting the lov ea is not locked but setting the attr is locked?
870 * Should this be fixed?
873 /* Replay creates has objects already */
874 if (spec->u.sp_ea.no_lov_create) {
875 CDEBUG(D_INFO, "we already have lov ea\n");
876 rc = mdd_lov_set_md(env, mdd_pobj, son,
877 (struct lov_mds_md *)spec->u.sp_ea.eadata,
878 spec->u.sp_ea.eadatalen, handle, 0);
880 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
881 lmm_size, handle, 0);
884 rc = mdd_attr_get_internal_locked(env, son, ma);
886 mdd_trans_stop(env, mdd, rc, handle);
888 /* Finish mdd_lov_create() stuff. */
889 mdd_lov_create_finish(env, mdd, rc);
891 OBD_FREE(lmm, lmm_size);
896 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
897 const char *name, struct lu_fid* fid, int mask)
899 const struct dt_key *key = (const struct dt_key *)name;
900 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
901 struct dt_object *dir = mdd_object_child(mdd_obj);
902 struct dt_rec *rec = (struct dt_rec *)fid;
903 struct timeval start;
907 mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
908 if (mdd_is_dead_obj(mdd_obj))
911 rc = lu_object_exists(mdd2lu_obj(mdd_obj));
915 CERROR("Object "DFID" locates on remote server\n",
916 PFID(mdo2fid(mdd_obj)));
920 rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
924 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
925 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
926 mdd_object_capa(env, mdd_obj));
928 fid_be_to_cpu(fid, fid);
932 mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
936 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
937 struct mdd_object *child, struct md_attr *ma,
938 struct thandle *handle)
944 * Update attributes for child.
947 * (1) the valid bits should be converted between Lustre and Linux;
948 * (2) maybe, the child attributes should be set in OSD when creation.
951 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
955 if (S_ISDIR(ma->ma_attr.la_mode)) {
956 /* Add "." and ".." for newly created dir */
957 mdd_ref_add_internal(env, child, handle);
958 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
959 dot, handle, BYPASS_CAPA);
961 rc = __mdd_index_insert_only(env, child, pfid,
967 rc2 = __mdd_index_delete(env, child, dot, 0,
968 handle, BYPASS_CAPA);
970 CERROR("Failure to cleanup after dotdot"
971 " creation: %d (%d)\n", rc2, rc);
973 mdd_ref_del_internal(env, child, handle);
980 static int mdd_create_sanity_check(const struct lu_env *env,
981 struct md_object *pobj,
986 struct mdd_thread_info *info = mdd_env_info(env);
987 struct lu_attr *la = &info->mti_la;
988 struct lu_fid *fid = &info->mti_fid;
989 struct mdd_object *obj = md2mdd_obj(pobj);
994 if (mdd_is_dead_obj(obj))
998 * In some cases this lookup is not needed - we know before that if name
1001 /* XXX disable that lookup temporary */
1004 * Check if the name already exist, though it will be checked in
1005 * _index_insert also, for avoiding rolling back if exists
1008 rc = __mdd_lookup_locked(env, pobj, name, fid,
1009 MAY_WRITE | MAY_EXEC);
1011 RETURN(rc ? : -EEXIST);
1014 * Check if has WRITE permission for the parent.
1016 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1022 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1026 if (la->la_mode & S_ISGID) {
1027 ma->ma_attr.la_gid = la->la_gid;
1028 if (S_ISDIR(ma->ma_attr.la_mode)) {
1029 ma->ma_attr.la_mode |= S_ISGID;
1030 ma->ma_attr.la_valid |= LA_MODE;
1034 switch (ma->ma_attr.la_mode & S_IFMT) {
1052 * Create object and insert it into namespace.
1054 static int mdd_create(const struct lu_env *env,
1055 struct md_object *pobj, const char *name,
1056 struct md_object *child,
1057 struct md_op_spec *spec,
1060 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1061 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1062 struct mdd_object *son = md2mdd_obj(child);
1063 struct mdd_device *mdd = mdo2mdd(pobj);
1064 struct lu_attr *attr = &ma->ma_attr;
1065 struct lov_mds_md *lmm = NULL;
1066 struct thandle *handle;
1067 int rc, created = 0, inserted = 0, lmm_size = 0;
1068 struct dynlock_handle *dlh;
1069 struct timeval start;
1072 mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
1075 * Two operations have to be performed:
1077 * - allocation of new object (->do_create()), and
1079 * - insertion into parent index (->dio_insert()).
1081 * Due to locking, operation order is not important, when both are
1082 * successful, *but* error handling cases are quite different:
1084 * - if insertion is done first, and following object creation fails,
1085 * insertion has to be rolled back, but this operation might fail
1086 * also leaving us with dangling index entry.
1088 * - if creation is done first, is has to be undone if insertion
1089 * fails, leaving us with leaked space, which is neither good, nor
1092 * It seems that creation-first is simplest solution, but it is
1093 * sub-optimal in the frequent
1098 * case, because second mkdir is bound to create object, only to
1099 * destroy it immediately.
1101 * To avoid this follow local file systems that do double lookup:
1103 * 0. lookup -> -EEXIST (mdd_create_sanity_check())
1105 * 1. create (mdd_object_create_internal())
1107 * 2. insert (__mdd_index_insert(), lookup again)
1110 /* Sanity checks before big job. */
1111 rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1116 * No RPC inside the transaction, so OST objects should be created at
1119 if (S_ISREG(attr->la_mode)) {
1120 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1126 mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1127 handle = mdd_trans_start(env, mdd);
1129 GOTO(out_free, rc = PTR_ERR(handle));
1131 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1133 GOTO(out_trans, rc = -ENOMEM);
1136 * XXX: Check that link can be added to the parent in mkdir case.
1139 mdd_write_lock(env, son);
1140 rc = mdd_object_create_internal(env, son, ma, handle);
1142 mdd_write_unlock(env, son);
1148 #ifdef CONFIG_FS_POSIX_ACL
1149 mdd_read_lock(env, mdd_pobj);
1150 rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1151 mdd_read_unlock(env, mdd_pobj);
1153 mdd_write_unlock(env, son);
1156 ma->ma_attr.la_valid |= LA_MODE;
1160 rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1162 mdd_write_unlock(env, son);
1165 * Object has no links, so it will be destroyed when last
1166 * reference is released. (XXX not now.)
1170 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1171 name, S_ISDIR(attr->la_mode), handle,
1172 mdd_object_capa(env, mdd_pobj));
1179 /* Replay creates has objects already. */
1180 if (spec->u.sp_ea.no_lov_create) {
1181 CDEBUG(D_INFO, "we already have lov ea\n");
1182 LASSERT(lmm == NULL);
1183 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1184 lmm_size = spec->u.sp_ea.eadatalen;
1186 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1188 CERROR("error on stripe info copy %d \n", rc);
1191 if (lmm && lmm_size > 0) {
1192 /* Set Lov here, do not get lmm again later */
1193 memcpy(ma->ma_lmm, lmm, lmm_size);
1194 ma->ma_lmm_size = lmm_size;
1195 ma->ma_valid |= MA_LOV;
1198 if (S_ISLNK(attr->la_mode)) {
1199 struct dt_object *dt = mdd_object_child(son);
1200 const char *target_name = spec->u.sp_symname;
1201 int sym_len = strlen(target_name);
1202 const struct lu_buf *buf;
1205 buf = mdd_buf_get_const(env, target_name, sym_len);
1206 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1207 mdd_object_capa(env, son));
1212 GOTO(cleanup, rc = -EFAULT);
1216 la->la_valid = LA_CTIME | LA_MTIME;
1217 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1221 /* Return attr back. */
1222 rc = mdd_attr_get_internal_locked(env, son, ma);
1225 if (rc && created) {
1229 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1230 S_ISDIR(attr->la_mode),
1231 handle, BYPASS_CAPA);
1233 CERROR("error can not cleanup destroy %d\n",
1237 mdd_write_lock(env, son);
1238 mdd_ref_del_internal(env, son, handle);
1239 mdd_write_unlock(env, son);
1243 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1245 mdd_trans_stop(env, mdd, rc, handle);
1247 if (lmm && !spec->u.sp_ea.no_lov_create)
1248 OBD_FREE(lmm, lmm_size);
1249 /* Finish mdd_lov_create() stuff */
1250 mdd_lov_create_finish(env, mdd, rc);
1251 mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
1256 * Get locks on parents in proper order
1257 * RETURN: < 0 - error, rename_order if successful
1265 static int mdd_rename_order(const struct lu_env *env,
1266 struct mdd_device *mdd,
1267 struct mdd_object *src_pobj,
1268 struct mdd_object *tgt_pobj)
1270 /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1274 if (src_pobj == tgt_pobj)
1275 RETURN(MDD_RN_SAME);
1277 /* compared the parent child relationship of src_p&tgt_p */
1278 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1280 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1283 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1296 static int mdd_rename_sanity_check(const struct lu_env *env,
1297 struct mdd_object *src_pobj,
1298 struct mdd_object *tgt_pobj,
1299 const struct lu_fid *sfid,
1301 struct mdd_object *tobj)
1306 if (mdd_is_dead_obj(src_pobj))
1309 /* The sobj maybe on the remote, check parent permission only here */
1310 rc = mdd_permission_internal_locked(env, src_pobj, NULL,
1311 MAY_WRITE | MAY_EXEC);
1316 rc = mdd_may_create(env, tgt_pobj, NULL,
1317 (src_pobj != tgt_pobj));
1319 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1320 (src_pobj != tgt_pobj));
1322 if (S_ISDIR(mdd_object_type(tobj))
1323 && mdd_dir_is_empty(env, tobj))
1329 /* src object can be remote that is why we use only fid and type of object */
1330 static int mdd_rename(const struct lu_env *env,
1331 struct md_object *src_pobj, struct md_object *tgt_pobj,
1332 const struct lu_fid *lf, const char *sname,
1333 struct md_object *tobj, const char *tname,
1336 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1337 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1338 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1339 struct mdd_device *mdd = mdo2mdd(src_pobj);
1340 struct mdd_object *mdd_sobj = NULL;
1341 struct mdd_object *mdd_tobj = NULL;
1342 struct dynlock_handle *sdlh, *tdlh;
1343 struct thandle *handle;
1348 LASSERT(ma->ma_attr.la_mode & S_IFMT);
1349 is_dir = S_ISDIR(ma->ma_attr.la_mode);
1350 if (ma->ma_attr.la_valid & LA_FLAGS &&
1351 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1355 mdd_tobj = md2mdd_obj(tobj);
1357 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1358 handle = mdd_trans_start(env, mdd);
1360 RETURN(PTR_ERR(handle));
1362 /* FIXME: Should consider tobj and sobj too in rename_lock. */
1363 rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1365 GOTO(cleanup_unlocked, rc);
1367 /* Get locks in determined order */
1368 if (rc == MDD_RN_SAME) {
1369 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1370 /* check hashes to determine do we need one lock or two */
1371 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1372 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1375 } else if (rc == MDD_RN_SRCTGT) {
1376 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1377 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1379 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1380 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1382 if (sdlh == NULL || tdlh == NULL)
1383 GOTO(cleanup, rc = -ENOMEM);
1385 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1386 lf, is_dir, mdd_tobj);
1390 rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1391 mdd_object_capa(env, mdd_spobj));
1396 * Here tobj can be remote one, so we do index_delete unconditionally
1397 * and -ENOENT is allowed.
1399 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1400 mdd_object_capa(env, mdd_tpobj));
1401 if (rc != 0 && rc != -ENOENT)
1404 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1405 mdd_object_capa(env, mdd_tpobj));
1410 mdd_sobj = mdd_object_find(env, mdd, lf);
1412 la->la_valid = LA_CTIME;
1414 /* XXX: How to update ctime for remote sobj? */
1415 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1419 if (tobj && lu_object_exists(&tobj->mo_lu)) {
1420 mdd_write_lock(env, mdd_tobj);
1421 mdd_ref_del_internal(env, mdd_tobj, handle);
1423 /* Remove dot reference. */
1425 mdd_ref_del_internal(env, mdd_tobj, handle);
1427 la->la_valid = LA_CTIME;
1428 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1432 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1433 mdd_write_unlock(env, mdd_tobj);
1438 la->la_valid = LA_CTIME | LA_MTIME;
1439 rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1443 if (mdd_spobj != mdd_tpobj) {
1444 la->la_valid = LA_CTIME | LA_MTIME;
1445 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1451 if (likely(tdlh) && sdlh != tdlh)
1452 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1454 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1456 mdd_trans_stop(env, mdd, rc, handle);
1458 mdd_object_put(env, mdd_sobj);
1462 struct md_dir_operations mdd_dir_ops = {
1463 .mdo_is_subdir = mdd_is_subdir,
1464 .mdo_lookup = mdd_lookup,
1465 .mdo_create = mdd_create,
1466 .mdo_rename = mdd_rename,
1467 .mdo_link = mdd_link,
1468 .mdo_unlink = mdd_unlink,
1469 .mdo_name_insert = mdd_name_insert,
1470 .mdo_name_remove = mdd_name_remove,
1471 .mdo_rename_tgt = mdd_rename_tgt,
1472 .mdo_create_data = mdd_create_data