1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
46 #include "mdd_internal.h"
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52 const char *name, struct lu_fid* fid, int mask);
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55 const char *name, struct lu_fid* fid, int mask)
57 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58 struct dynlock_handle *dlh;
61 dlh = mdd_pdo_read_lock(env, mdd_obj, name);
64 rc = __mdd_lookup(env, pobj, name, fid, mask);
65 mdd_pdo_read_unlock(env, mdd_obj, dlh);
70 static int mdd_lookup(const struct lu_env *env,
71 struct md_object *pobj, const char *name,
72 struct lu_fid* fid, struct md_op_spec *spec)
76 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
84 return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
88 * For root fid use special function, whcih does not compare version component
89 * of fid. Vresion component is different for root fids on all MDTs.
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
93 return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
98 * return 1: if lf is the fid of the ancestor of p1;
101 * return -EREMOTE: if remote object is found, in this
102 * case fid of remote object is saved to @pf;
104 * otherwise: values < 0, errors.
106 static int mdd_is_parent(const struct lu_env *env,
107 struct mdd_device *mdd,
108 struct mdd_object *p1,
109 const struct lu_fid *lf,
112 struct mdd_object *parent = NULL;
117 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118 pfid = &mdd_env_info(env)->mti_fid;
120 /* Check for root first. */
121 if (mdd_is_root(mdd, mdo2fid(p1)))
125 rc = mdd_parent_fid(env, p1, pfid);
128 if (mdd_is_root(mdd, pfid))
130 if (lu_fid_eq(pfid, lf))
133 mdd_object_put(env, parent);
134 parent = mdd_object_find(env, mdd, pfid);
136 /* cross-ref parent */
137 if (parent == NULL) {
140 GOTO(out, rc = -EREMOTE);
141 } else if (IS_ERR(parent))
142 GOTO(out, rc = PTR_ERR(parent));
147 if (parent && !IS_ERR(parent))
148 mdd_object_put(env, parent);
153 * No permission check is needed.
155 * returns 1: if fid is ancestor of @mo;
156 * returns 0: if fid is not a ancestor of @mo;
158 * returns EREMOTE if remote object is found, fid of remote object is saved to
161 * returns < 0: if error
163 static int mdd_is_subdir(const struct lu_env *env,
164 struct md_object *mo, const struct lu_fid *fid,
167 struct mdd_device *mdd = mdo2mdd(mo);
171 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
174 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
178 } else if (rc == 1) {
179 /* found @fid is parent */
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188 struct mdd_object *cobj, int need_check)
193 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
196 if (mdd_is_dead_obj(pobj))
200 rc = mdd_permission_internal_locked(env, pobj, NULL,
201 MAY_WRITE | MAY_EXEC);
206 static inline int mdd_is_sticky(const struct lu_env *env,
207 struct mdd_object *pobj,
208 struct mdd_object *cobj)
210 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
211 struct md_ucred *uc = md_ucred(env);
214 rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
217 } else if (tmp_la->la_uid == uc->mu_fsuid) {
220 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
223 else if (!(tmp_la->la_mode & S_ISVTX) ||
224 (tmp_la->la_uid == uc->mu_fsuid))
227 return !mdd_capable(uc, CAP_FOWNER);
231 /* Check whether it may delete the cobj under the pobj. */
232 static int mdd_may_delete(const struct lu_env *env,
233 struct mdd_object *pobj,
234 struct mdd_object *cobj,
235 int is_dir, int need_check)
237 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
243 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
246 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
250 if (!S_ISDIR(mdd_object_type(cobj)))
253 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
256 } else if (S_ISDIR(mdd_object_type(cobj))) {
261 if (mdd_is_dead_obj(pobj))
264 if (mdd_is_sticky(env, pobj, cobj))
268 rc = mdd_permission_internal_locked(env, pobj, NULL,
269 MAY_WRITE | MAY_EXEC);
274 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
275 struct mdd_object *src_obj)
280 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
283 if (S_ISDIR(mdd_object_type(src_obj)))
286 LASSERT(src_obj != tgt_obj);
288 rc = mdd_may_create(env, tgt_obj, NULL, 1);
296 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
297 const struct lu_fid *fid)
299 struct mdd_thread_info *info = mdd_env_info(env);
301 fid_cpu_to_be(&info->mti_fid2, fid);
302 return (const struct dt_rec *)&info->mti_fid2;
306 /* insert new index, add reference if isdir, update times */
307 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
308 const struct lu_fid *lf, const char *name, int is_dir,
309 struct thandle *handle, struct lustre_capa *capa)
311 struct dt_object *next = mdd_object_child(pobj);
312 struct timeval start;
316 mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
317 LPROC_MDD_INDEX_INSERT);
318 if (dt_try_as_dir(env, next)) {
319 rc = next->do_index_ops->dio_insert(env, next,
320 __mdd_fid_rec(env, lf),
321 (const struct dt_key *)name,
329 mdd_write_lock(env, pobj);
330 mdd_ref_add_internal(env, pobj, handle);
331 mdd_write_unlock(env, pobj);
334 mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
335 LPROC_MDD_INDEX_INSERT);
339 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
340 const char *name, int is_dir, struct thandle *handle,
341 struct lustre_capa *capa)
343 struct dt_object *next = mdd_object_child(pobj);
344 struct timeval start;
348 mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
349 LPROC_MDD_INDEX_DELETE);
351 if (dt_try_as_dir(env, next)) {
352 rc = next->do_index_ops->dio_delete(env, next,
353 (struct dt_key *)name,
355 if (rc == 0 && is_dir) {
356 mdd_write_lock(env, pobj);
357 mdd_ref_del_internal(env, pobj, handle);
358 mdd_write_unlock(env, pobj);
363 mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
364 LPROC_MDD_INDEX_DELETE);
369 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
370 const struct lu_fid *lf, const char *name,
371 struct thandle *handle, struct lustre_capa *capa)
373 struct dt_object *next = mdd_object_child(pobj);
377 if (dt_try_as_dir(env, next)) {
378 rc = next->do_index_ops->dio_insert(env, next,
379 __mdd_fid_rec(env, lf),
380 (const struct dt_key *)name,
388 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
389 struct md_object *src_obj, const char *name,
392 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
393 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
394 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
395 struct mdd_device *mdd = mdo2mdd(src_obj);
396 struct dynlock_handle *dlh;
397 struct thandle *handle;
401 mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
402 handle = mdd_trans_start(env, mdd);
404 RETURN(PTR_ERR(handle));
406 dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
408 GOTO(out_trans, rc = -ENOMEM);
409 mdd_write_lock(env, mdd_sobj);
411 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
413 GOTO(out_unlock, rc);
415 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
417 mdd_object_capa(env, mdd_tobj));
419 GOTO(out_unlock, rc);
421 mdd_ref_add_internal(env, mdd_sobj, handle);
424 la->la_valid = LA_CTIME | LA_MTIME;
425 rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
427 GOTO(out_unlock, rc);
429 la->la_valid = LA_CTIME;
430 rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
433 mdd_write_unlock(env, mdd_sobj);
434 mdd_pdo_write_unlock(env, mdd_tobj, dlh);
436 mdd_trans_stop(env, mdd, rc, handle);
440 static inline void mdd_set_dead_obj(struct mdd_object *obj)
443 obj->mod_flags |= DEAD_OBJ;
446 /* caller should take a lock before calling */
447 int mdd_finish_unlink(const struct lu_env *env,
448 struct mdd_object *obj, struct md_attr *ma,
454 rc = mdd_iattr_get(env, obj, ma);
455 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
456 /* add new orphan and the object
457 * will be deleted during the object_put() */
458 if (__mdd_orphan_add(env, obj, th) == 0)
459 obj->mod_flags |= ORPHAN_OBJ;
461 mdd_set_dead_obj(obj);
462 if (obj->mod_count == 0)
463 rc = mdd_object_kill(env, obj, ma);
465 /* clear MA_LOV | MA_COOKIE, if we do not
466 * unlink it in case we get it somewhere */
467 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
469 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
475 * Check that @dir contains no entries except (possibly) dot and dotdot.
480 * -ENOTEMPTY not empty
484 static int mdd_dir_is_empty(const struct lu_env *env,
485 struct mdd_object *dir)
488 struct dt_object *obj;
489 struct dt_it_ops *iops;
493 obj = mdd_object_child(dir);
494 iops = &obj->do_index_ops->dio_it;
495 it = iops->init(env, obj, 0, BYPASS_CAPA);
497 result = iops->get(env, it, (const void *)"");
500 for (result = 0, i = 0; result == 0 && i < 3; ++i)
501 result = iops->next(env, it);
504 else if (result == +1)
506 } else if (result == 0)
508 * Huh? Index contains no zero key?
519 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
520 struct mdd_object *cobj, struct md_attr *ma)
522 struct dt_object *dt_cobj = mdd_object_child(cobj);
526 rc = mdd_may_delete(env, pobj, cobj,
527 S_ISDIR(ma->ma_attr.la_mode), 1);
531 if (S_ISDIR(mdd_object_type(cobj))) {
532 if (dt_try_as_dir(env, dt_cobj))
533 rc = mdd_dir_is_empty(env, cobj);
541 extern atomic_t lvar_enoent_debug;
542 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
543 struct md_object *cobj, const char *name,
546 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
547 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
548 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
549 struct mdd_device *mdd = mdo2mdd(pobj);
550 struct dynlock_handle *dlh;
551 struct thandle *handle;
556 * Check -ENOENT early here because we need to get object type
557 * to calculate credits before transaction start
559 if (!lu_object_exists(&cobj->mo_lu)) {
560 LU_OBJECT_DEBUG(D_ERROR, env, &cobj->mo_lu,
561 "unlinking as `%s'", name);
565 LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is "DFID"\n",
566 PFID(lu_object_fid(&cobj->mo_lu)));
568 rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
572 handle = mdd_trans_start(env, mdd);
574 RETURN(PTR_ERR(handle));
576 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
578 GOTO(out_trans, rc = -ENOMEM);
579 mdd_write_lock(env, mdd_cobj);
581 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
585 is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
587 * This should be per-thread debugging flag, but
589 atomic_inc(&lvar_enoent_debug);
590 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
591 mdd_object_capa(env, mdd_pobj));
592 atomic_dec(&lvar_enoent_debug);
596 mdd_ref_del_internal(env, mdd_cobj, handle);
599 mdd_ref_del_internal(env, mdd_cobj, handle);
603 la->la_valid = LA_CTIME | LA_MTIME;
604 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
608 la->la_valid = LA_CTIME;
609 rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
613 rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
616 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
617 strlen("unlinked"), "unlinked", 0,
621 mdd_write_unlock(env, mdd_cobj);
622 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
624 mdd_trans_stop(env, mdd, rc, handle);
628 static int mdd_ni_sanity_check(const struct lu_env *env,
629 struct md_object *pobj,
631 const struct lu_fid *fid)
633 struct mdd_object *obj = md2mdd_obj(pobj);
637 if (mdd_is_dead_obj(obj))
640 /* The exist of the name will be checked in _index_insert. */
641 RETURN(mdd_permission_internal_locked(env, obj, NULL,
642 MAY_WRITE | MAY_EXEC));
648 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
649 const char *name, const struct lu_fid *fid,
652 struct lu_attr *la = &mdd_env_info(env)->mti_la;
653 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
654 struct mdd_device *mdd = mdo2mdd(pobj);
655 struct dynlock_handle *dlh;
656 struct thandle *handle;
660 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
661 handle = mdd_trans_start(env, mdo2mdd(pobj));
663 RETURN(PTR_ERR(handle));
665 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
667 GOTO(out_trans, rc = -ENOMEM);
668 rc = mdd_ni_sanity_check(env, pobj, name, fid);
670 GOTO(out_unlock, rc);
672 rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
673 handle, BYPASS_CAPA);
675 la->la_ctime = la->la_atime = CURRENT_SECONDS;
676 la->la_valid = LA_ATIME | LA_CTIME;
677 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
681 mdd_pdo_write_unlock(env, mdd_obj, dlh);
683 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
687 static int mdd_nr_sanity_check(const struct lu_env *env,
688 struct md_object *pobj,
691 struct mdd_object *obj = md2mdd_obj(pobj);
695 if (mdd_is_dead_obj(obj)) {
696 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
700 /* Name presense will be checked in _index_delete. */
701 RETURN(mdd_permission_internal_locked(env, obj, NULL,
702 MAY_WRITE | MAY_EXEC));
708 static int mdd_name_remove(const struct lu_env *env,
709 struct md_object *pobj,
710 const char *name, int is_dir)
712 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
713 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
714 struct mdd_device *mdd = mdo2mdd(pobj);
715 struct dynlock_handle *dlh;
716 struct thandle *handle;
720 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
721 handle = mdd_trans_start(env, mdd);
723 RETURN(PTR_ERR(handle));
725 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
727 GOTO(out_trans, rc = -ENOMEM);
728 rc = mdd_nr_sanity_check(env, pobj, name);
730 GOTO(out_unlock, rc);
732 rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
733 handle, BYPASS_CAPA);
735 GOTO(out_unlock, rc);
737 la->la_ctime = la->la_mtime = CURRENT_SECONDS;
738 la->la_valid = LA_CTIME | LA_MTIME;
739 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
742 mdd_pdo_write_unlock(env, mdd_obj, dlh);
744 mdd_trans_stop(env, mdd, rc, handle);
748 static int mdd_rt_sanity_check(const struct lu_env *env,
749 struct mdd_object *tgt_pobj,
750 struct mdd_object *tobj,
751 const struct lu_fid *sfid,
752 const char *name, struct md_attr *ma)
758 if (mdd_is_dead_obj(tgt_pobj))
761 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
763 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
764 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
765 mdd_dir_is_empty(env, tobj))
768 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
774 static int mdd_rename_tgt(const struct lu_env *env,
775 struct md_object *pobj, struct md_object *tobj,
776 const struct lu_fid *lf, const char *name,
779 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
780 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
781 struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
782 struct mdd_device *mdd = mdo2mdd(pobj);
783 struct dynlock_handle *dlh;
784 struct thandle *handle;
788 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
789 handle = mdd_trans_start(env, mdd);
791 RETURN(PTR_ERR(handle));
793 dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
795 GOTO(out_trans, rc = -ENOMEM);
797 mdd_write_lock(env, mdd_tobj);
799 /* XXX: Rename sanity checking. */
800 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
805 * If rename_tgt is called then we should just re-insert name with
806 * correct fid, no need to dec/inc parent nlink if obj is dir.
808 rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
812 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
818 la->la_valid = LA_CTIME | LA_MTIME;
819 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
823 if (tobj && lu_object_exists(&tobj->mo_lu)) {
824 mdd_ref_del_internal(env, mdd_tobj, handle);
825 la->la_valid = LA_CTIME;
826 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
831 mdd_write_unlock(env, mdd_tobj);
832 mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
834 mdd_trans_stop(env, mdd, rc, handle);
839 * The permission has been checked when obj created, no need check again.
841 static int mdd_cd_sanity_check(const struct lu_env *env,
842 struct mdd_object *obj)
847 if (!obj || mdd_is_dead_obj(obj))
854 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
855 struct md_object *cobj, const struct md_op_spec *spec,
858 struct mdd_device *mdd = mdo2mdd(cobj);
859 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
860 struct mdd_object *son = md2mdd_obj(cobj);
861 struct lu_attr *attr = &ma->ma_attr;
862 struct lov_mds_md *lmm = NULL;
864 struct thandle *handle;
868 rc = mdd_cd_sanity_check(env, son);
872 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
873 !(spec->sp_cr_flags & FMODE_WRITE))
876 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
881 mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
882 handle = mdd_trans_start(env, mdd);
884 GOTO(out_free, rc = PTR_ERR(handle));
887 * XXX: Setting the lov ea is not locked but setting the attr is locked?
888 * Should this be fixed?
891 /* Replay creates has objects already */
892 if (spec->u.sp_ea.no_lov_create) {
893 CDEBUG(D_INFO, "we already have lov ea\n");
894 rc = mdd_lov_set_md(env, mdd_pobj, son,
895 (struct lov_mds_md *)spec->u.sp_ea.eadata,
896 spec->u.sp_ea.eadatalen, handle, 0);
898 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
899 lmm_size, handle, 0);
902 rc = mdd_attr_get_internal_locked(env, son, ma);
904 mdd_trans_stop(env, mdd, rc, handle);
906 /* Finish mdd_lov_create() stuff. */
907 mdd_lov_create_finish(env, mdd, rc);
909 OBD_FREE(lmm, lmm_size);
914 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
915 const char *name, struct lu_fid* fid, int mask)
917 const struct dt_key *key = (const struct dt_key *)name;
918 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
919 struct dt_object *dir = mdd_object_child(mdd_obj);
920 struct dt_rec *rec = (struct dt_rec *)fid;
921 struct timeval start;
925 mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
926 if (mdd_is_dead_obj(mdd_obj))
929 rc = lu_object_exists(mdd2lu_obj(mdd_obj));
933 CERROR("Object "DFID" locates on remote server\n",
934 PFID(mdo2fid(mdd_obj)));
938 rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
942 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
943 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
944 mdd_object_capa(env, mdd_obj));
946 fid_be_to_cpu(fid, fid);
950 mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
954 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
955 struct mdd_object *child, struct md_attr *ma,
956 struct thandle *handle)
962 * Update attributes for child.
965 * (1) the valid bits should be converted between Lustre and Linux;
966 * (2) maybe, the child attributes should be set in OSD when creation.
969 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
973 if (S_ISDIR(ma->ma_attr.la_mode)) {
974 /* Add "." and ".." for newly created dir */
975 mdd_ref_add_internal(env, child, handle);
976 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
977 dot, handle, BYPASS_CAPA);
979 rc = __mdd_index_insert_only(env, child, pfid,
985 rc2 = __mdd_index_delete(env, child, dot, 0,
986 handle, BYPASS_CAPA);
988 CERROR("Failure to cleanup after dotdot"
989 " creation: %d (%d)\n", rc2, rc);
991 mdd_ref_del_internal(env, child, handle);
998 static int mdd_create_sanity_check(const struct lu_env *env,
999 struct md_object *pobj,
1004 struct mdd_thread_info *info = mdd_env_info(env);
1005 struct lu_attr *la = &info->mti_la;
1006 struct lu_fid *fid = &info->mti_fid;
1007 struct mdd_object *obj = md2mdd_obj(pobj);
1012 if (mdd_is_dead_obj(obj))
1016 * In some cases this lookup is not needed - we know before if name
1017 * exists or not because MDT performs lookup for it.
1019 /* XXX disable that lookup temporary */
1022 * Check if the name already exist, though it will be checked in
1023 * _index_insert also, for avoiding rolling back if exists
1026 rc = __mdd_lookup_locked(env, pobj, name, fid,
1027 MAY_WRITE | MAY_EXEC);
1029 RETURN(rc ? : -EEXIST);
1032 * Check if has WRITE permission for the parent.
1034 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1040 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1044 if (la->la_mode & S_ISGID) {
1045 ma->ma_attr.la_gid = la->la_gid;
1046 if (S_ISDIR(ma->ma_attr.la_mode)) {
1047 ma->ma_attr.la_mode |= S_ISGID;
1048 ma->ma_attr.la_valid |= LA_MODE;
1052 switch (ma->ma_attr.la_mode & S_IFMT) {
1070 * Create object and insert it into namespace.
1072 static int mdd_create(const struct lu_env *env,
1073 struct md_object *pobj, const char *name,
1074 struct md_object *child,
1075 struct md_op_spec *spec,
1078 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1079 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1080 struct mdd_object *son = md2mdd_obj(child);
1081 struct mdd_device *mdd = mdo2mdd(pobj);
1082 struct lu_attr *attr = &ma->ma_attr;
1083 struct lov_mds_md *lmm = NULL;
1084 struct thandle *handle;
1085 int rc, created = 0, inserted = 0, lmm_size = 0;
1086 struct dynlock_handle *dlh;
1087 struct timeval start;
1090 mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
1093 * Two operations have to be performed:
1095 * - allocation of new object (->do_create()), and
1097 * - insertion into parent index (->dio_insert()).
1099 * Due to locking, operation order is not important, when both are
1100 * successful, *but* error handling cases are quite different:
1102 * - if insertion is done first, and following object creation fails,
1103 * insertion has to be rolled back, but this operation might fail
1104 * also leaving us with dangling index entry.
1106 * - if creation is done first, is has to be undone if insertion
1107 * fails, leaving us with leaked space, which is neither good, nor
1110 * It seems that creation-first is simplest solution, but it is
1111 * sub-optimal in the frequent
1116 * case, because second mkdir is bound to create object, only to
1117 * destroy it immediately.
1119 * To avoid this follow local file systems that do double lookup:
1121 * 0. lookup -> -EEXIST (mdd_create_sanity_check())
1123 * 1. create (mdd_object_create_internal())
1125 * 2. insert (__mdd_index_insert(), lookup again)
1128 /* Sanity checks before big job. */
1129 rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1134 * No RPC inside the transaction, so OST objects should be created at
1137 if (S_ISREG(attr->la_mode)) {
1138 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1144 mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1145 handle = mdd_trans_start(env, mdd);
1147 GOTO(out_free, rc = PTR_ERR(handle));
1149 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1151 GOTO(out_trans, rc = -ENOMEM);
1154 * XXX: Check that link can be added to the parent in mkdir case.
1157 mdd_write_lock(env, son);
1158 rc = mdd_object_create_internal(env, son, ma, handle);
1160 mdd_write_unlock(env, son);
1166 #ifdef CONFIG_FS_POSIX_ACL
1167 mdd_read_lock(env, mdd_pobj);
1168 rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1169 mdd_read_unlock(env, mdd_pobj);
1171 mdd_write_unlock(env, son);
1174 ma->ma_attr.la_valid |= LA_MODE;
1178 rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1180 mdd_write_unlock(env, son);
1183 * Object has no links, so it will be destroyed when last
1184 * reference is released. (XXX not now.)
1188 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1189 name, S_ISDIR(attr->la_mode), handle,
1190 mdd_object_capa(env, mdd_pobj));
1197 /* Replay creates has objects already. */
1198 if (spec->u.sp_ea.no_lov_create) {
1199 CDEBUG(D_INFO, "we already have lov ea\n");
1200 LASSERT(lmm == NULL);
1201 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1202 lmm_size = spec->u.sp_ea.eadatalen;
1204 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1206 CERROR("error on stripe info copy %d \n", rc);
1209 if (lmm && lmm_size > 0) {
1210 /* Set Lov here, do not get lmm again later */
1211 memcpy(ma->ma_lmm, lmm, lmm_size);
1212 ma->ma_lmm_size = lmm_size;
1213 ma->ma_valid |= MA_LOV;
1216 if (S_ISLNK(attr->la_mode)) {
1217 struct dt_object *dt = mdd_object_child(son);
1218 const char *target_name = spec->u.sp_symname;
1219 int sym_len = strlen(target_name);
1220 const struct lu_buf *buf;
1223 buf = mdd_buf_get_const(env, target_name, sym_len);
1224 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1225 mdd_object_capa(env, son));
1230 GOTO(cleanup, rc = -EFAULT);
1234 la->la_valid = LA_CTIME | LA_MTIME;
1235 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1239 /* Return attr back. */
1240 rc = mdd_attr_get_internal_locked(env, son, ma);
1243 if (rc && created) {
1247 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1248 S_ISDIR(attr->la_mode),
1249 handle, BYPASS_CAPA);
1251 CERROR("error can not cleanup destroy %d\n",
1255 mdd_write_lock(env, son);
1256 mdd_ref_del_internal(env, son, handle);
1257 mdd_write_unlock(env, son);
1261 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1263 mdd_trans_stop(env, mdd, rc, handle);
1265 if (lmm && !spec->u.sp_ea.no_lov_create)
1266 OBD_FREE(lmm, lmm_size);
1267 /* Finish mdd_lov_create() stuff */
1268 mdd_lov_create_finish(env, mdd, rc);
1269 mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
1274 * Get locks on parents in proper order
1275 * RETURN: < 0 - error, rename_order if successful
1283 static int mdd_rename_order(const struct lu_env *env,
1284 struct mdd_device *mdd,
1285 struct mdd_object *src_pobj,
1286 struct mdd_object *tgt_pobj)
1288 /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1292 if (src_pobj == tgt_pobj)
1293 RETURN(MDD_RN_SAME);
1295 /* compared the parent child relationship of src_p&tgt_p */
1296 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1298 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1301 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1314 static int mdd_rename_sanity_check(const struct lu_env *env,
1315 struct mdd_object *src_pobj,
1316 struct mdd_object *tgt_pobj,
1317 const struct lu_fid *sfid,
1319 struct mdd_object *tobj)
1324 if (mdd_is_dead_obj(src_pobj))
1327 /* The sobj maybe on the remote, check parent permission only here */
1328 rc = mdd_permission_internal_locked(env, src_pobj, NULL,
1329 MAY_WRITE | MAY_EXEC);
1334 rc = mdd_may_create(env, tgt_pobj, NULL,
1335 (src_pobj != tgt_pobj));
1337 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1338 (src_pobj != tgt_pobj));
1340 if (S_ISDIR(mdd_object_type(tobj))
1341 && mdd_dir_is_empty(env, tobj))
1347 /* src object can be remote that is why we use only fid and type of object */
1348 static int mdd_rename(const struct lu_env *env,
1349 struct md_object *src_pobj, struct md_object *tgt_pobj,
1350 const struct lu_fid *lf, const char *sname,
1351 struct md_object *tobj, const char *tname,
1354 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1355 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1356 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1357 struct mdd_device *mdd = mdo2mdd(src_pobj);
1358 struct mdd_object *mdd_sobj = NULL;
1359 struct mdd_object *mdd_tobj = NULL;
1360 struct dynlock_handle *sdlh, *tdlh;
1361 struct thandle *handle;
1366 LASSERT(ma->ma_attr.la_mode & S_IFMT);
1367 is_dir = S_ISDIR(ma->ma_attr.la_mode);
1368 if (ma->ma_attr.la_valid & LA_FLAGS &&
1369 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1373 mdd_tobj = md2mdd_obj(tobj);
1375 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1376 handle = mdd_trans_start(env, mdd);
1378 RETURN(PTR_ERR(handle));
1380 /* FIXME: Should consider tobj and sobj too in rename_lock. */
1381 rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1383 GOTO(cleanup_unlocked, rc);
1385 /* Get locks in determined order */
1386 if (rc == MDD_RN_SAME) {
1387 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1388 /* check hashes to determine do we need one lock or two */
1389 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1390 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1393 } else if (rc == MDD_RN_SRCTGT) {
1394 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1395 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1397 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1398 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1400 if (sdlh == NULL || tdlh == NULL)
1401 GOTO(cleanup, rc = -ENOMEM);
1403 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1404 lf, is_dir, mdd_tobj);
1408 rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1409 mdd_object_capa(env, mdd_spobj));
1414 * Here tobj can be remote one, so we do index_delete unconditionally
1415 * and -ENOENT is allowed.
1417 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1418 mdd_object_capa(env, mdd_tpobj));
1419 if (rc != 0 && rc != -ENOENT)
1422 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1423 mdd_object_capa(env, mdd_tpobj));
1428 mdd_sobj = mdd_object_find(env, mdd, lf);
1430 la->la_valid = LA_CTIME;
1432 /* XXX: How to update ctime for remote sobj? */
1433 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1437 if (tobj && lu_object_exists(&tobj->mo_lu)) {
1438 mdd_write_lock(env, mdd_tobj);
1439 mdd_ref_del_internal(env, mdd_tobj, handle);
1441 /* Remove dot reference. */
1443 mdd_ref_del_internal(env, mdd_tobj, handle);
1445 la->la_valid = LA_CTIME;
1446 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1450 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1451 mdd_write_unlock(env, mdd_tobj);
1456 la->la_valid = LA_CTIME | LA_MTIME;
1457 rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1461 if (mdd_spobj != mdd_tpobj) {
1462 la->la_valid = LA_CTIME | LA_MTIME;
1463 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1469 if (likely(tdlh) && sdlh != tdlh)
1470 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1472 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1474 mdd_trans_stop(env, mdd, rc, handle);
1476 mdd_object_put(env, mdd_sobj);
1480 struct md_dir_operations mdd_dir_ops = {
1481 .mdo_is_subdir = mdd_is_subdir,
1482 .mdo_lookup = mdd_lookup,
1483 .mdo_create = mdd_create,
1484 .mdo_rename = mdd_rename,
1485 .mdo_link = mdd_link,
1486 .mdo_unlink = mdd_unlink,
1487 .mdo_name_insert = mdd_name_insert,
1488 .mdo_name_remove = mdd_name_remove,
1489 .mdo_rename_tgt = mdd_rename_tgt,
1490 .mdo_create_data = mdd_create_data