1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
46 #include "mdd_internal.h"
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52 const char *name, struct lu_fid* fid, int mask);
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55 const char *name, struct lu_fid* fid, int mask)
57 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58 struct dynlock_handle *dlh;
61 dlh = mdd_pdo_read_lock(env, mdd_obj, name);
64 rc = __mdd_lookup(env, pobj, name, fid, mask);
65 mdd_pdo_read_unlock(env, mdd_obj, dlh);
70 static int mdd_lookup(const struct lu_env *env,
71 struct md_object *pobj, const char *name,
72 struct lu_fid* fid, struct md_op_spec *spec)
76 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
84 return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
88 * For root fid use special function, whcih does not compare version component
89 * of fid. Vresion component is different for root fids on all MDTs.
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
93 return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
98 * return 1: if lf is the fid of the ancestor of p1;
101 * return -EREMOTE: if remote object is found, in this
102 * case fid of remote object is saved to @pf;
104 * otherwise: values < 0, errors.
106 static int mdd_is_parent(const struct lu_env *env,
107 struct mdd_device *mdd,
108 struct mdd_object *p1,
109 const struct lu_fid *lf,
112 struct mdd_object *parent = NULL;
117 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118 pfid = &mdd_env_info(env)->mti_fid;
120 /* Check for root first. */
121 if (mdd_is_root(mdd, mdo2fid(p1)))
125 rc = mdd_parent_fid(env, p1, pfid);
128 if (mdd_is_root(mdd, pfid))
130 if (lu_fid_eq(pfid, lf))
133 mdd_object_put(env, parent);
134 parent = mdd_object_find(env, mdd, pfid);
136 /* cross-ref parent */
137 if (parent == NULL) {
140 GOTO(out, rc = -EREMOTE);
141 } else if (IS_ERR(parent))
142 GOTO(out, rc = PTR_ERR(parent));
147 if (parent && !IS_ERR(parent))
148 mdd_object_put(env, parent);
153 * No permission check is needed.
155 * returns 1: if fid is ancestor of @mo;
156 * returns 0: if fid is not a ancestor of @mo;
158 * returns EREMOTE if remote object is found, fid of remote object is saved to
161 * returns < 0: if error
163 static int mdd_is_subdir(const struct lu_env *env,
164 struct md_object *mo, const struct lu_fid *fid,
167 struct mdd_device *mdd = mdo2mdd(mo);
171 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
174 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
178 } else if (rc == 1) {
179 /* found @fid is parent */
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188 struct mdd_object *cobj, int need_check)
193 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
196 if (mdd_is_dead_obj(pobj))
200 rc = mdd_permission_internal_locked(env, pobj, NULL,
201 MAY_WRITE | MAY_EXEC);
206 static inline int mdd_is_sticky(const struct lu_env *env,
207 struct mdd_object *pobj,
208 struct mdd_object *cobj)
210 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
211 struct md_ucred *uc = md_ucred(env);
214 rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
217 } else if (tmp_la->la_uid == uc->mu_fsuid) {
220 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
223 else if (!(tmp_la->la_mode & S_ISVTX) ||
224 (tmp_la->la_uid == uc->mu_fsuid))
227 return !mdd_capable(uc, CAP_FOWNER);
231 /* Check whether it may delete the cobj under the pobj. */
232 static int mdd_may_delete(const struct lu_env *env,
233 struct mdd_object *pobj,
234 struct mdd_object *cobj,
235 int is_dir, int need_check)
237 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
243 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
246 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
250 if (!S_ISDIR(mdd_object_type(cobj)))
253 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
256 } else if (S_ISDIR(mdd_object_type(cobj))) {
261 if (mdd_is_dead_obj(pobj))
264 if (mdd_is_sticky(env, pobj, cobj))
268 rc = mdd_permission_internal_locked(env, pobj, NULL,
269 MAY_WRITE | MAY_EXEC);
274 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
275 struct mdd_object *src_obj)
280 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
283 if (S_ISDIR(mdd_object_type(src_obj)))
286 LASSERT(src_obj != tgt_obj);
288 rc = mdd_may_create(env, tgt_obj, NULL, 1);
296 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
297 const struct lu_fid *fid)
299 struct mdd_thread_info *info = mdd_env_info(env);
301 fid_cpu_to_be(&info->mti_fid2, fid);
302 return (const struct dt_rec *)&info->mti_fid2;
306 /* insert new index, add reference if isdir, update times */
307 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
308 const struct lu_fid *lf, const char *name, int is_dir,
309 struct thandle *handle, struct lustre_capa *capa)
311 struct dt_object *next = mdd_object_child(pobj);
312 struct timeval start;
316 mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
317 LPROC_MDD_INDEX_INSERT);
318 if (dt_try_as_dir(env, next)) {
319 rc = next->do_index_ops->dio_insert(env, next,
320 __mdd_fid_rec(env, lf),
321 (const struct dt_key *)name,
329 mdd_write_lock(env, pobj);
330 mdd_ref_add_internal(env, pobj, handle);
331 mdd_write_unlock(env, pobj);
334 mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
335 LPROC_MDD_INDEX_INSERT);
339 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
340 const char *name, int is_dir, struct thandle *handle,
341 struct lustre_capa *capa)
343 struct dt_object *next = mdd_object_child(pobj);
344 struct timeval start;
348 mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
349 LPROC_MDD_INDEX_DELETE);
351 if (dt_try_as_dir(env, next)) {
352 rc = next->do_index_ops->dio_delete(env, next,
353 (struct dt_key *)name,
355 if (rc == 0 && is_dir) {
356 mdd_write_lock(env, pobj);
357 mdd_ref_del_internal(env, pobj, handle);
358 mdd_write_unlock(env, pobj);
363 mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
364 LPROC_MDD_INDEX_DELETE);
369 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
370 const struct lu_fid *lf, const char *name,
371 struct thandle *handle, struct lustre_capa *capa)
373 struct dt_object *next = mdd_object_child(pobj);
377 if (dt_try_as_dir(env, next)) {
378 rc = next->do_index_ops->dio_insert(env, next,
379 __mdd_fid_rec(env, lf),
380 (const struct dt_key *)name,
388 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
389 struct md_object *src_obj, const char *name,
392 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
393 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
394 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
395 struct mdd_device *mdd = mdo2mdd(src_obj);
396 struct dynlock_handle *dlh;
397 struct thandle *handle;
401 mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
402 handle = mdd_trans_start(env, mdd);
404 RETURN(PTR_ERR(handle));
406 dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
408 GOTO(out_trans, rc = -ENOMEM);
409 mdd_write_lock(env, mdd_sobj);
411 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
413 GOTO(out_unlock, rc);
415 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
417 mdd_object_capa(env, mdd_tobj));
419 GOTO(out_unlock, rc);
421 mdd_ref_add_internal(env, mdd_sobj, handle);
424 la->la_valid = LA_CTIME | LA_MTIME;
425 rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
427 GOTO(out_unlock, rc);
429 la->la_valid = LA_CTIME;
430 rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
433 mdd_write_unlock(env, mdd_sobj);
434 mdd_pdo_write_unlock(env, mdd_tobj, dlh);
436 mdd_trans_stop(env, mdd, rc, handle);
440 static inline void mdd_set_dead_obj(struct mdd_object *obj)
443 obj->mod_flags |= DEAD_OBJ;
446 /* caller should take a lock before calling */
447 int mdd_finish_unlink(const struct lu_env *env,
448 struct mdd_object *obj, struct md_attr *ma,
454 rc = mdd_iattr_get(env, obj, ma);
455 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
456 /* add new orphan and the object
457 * will be deleted during the object_put() */
458 if (__mdd_orphan_add(env, obj, th) == 0)
459 obj->mod_flags |= ORPHAN_OBJ;
461 mdd_set_dead_obj(obj);
462 if (obj->mod_count == 0)
463 rc = mdd_object_kill(env, obj, ma);
465 /* clear MA_LOV | MA_COOKIE, if we do not
466 * unlink it in case we get it somewhere */
467 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
469 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
475 * Check that @dir contains no entries except (possibly) dot and dotdot.
480 * -ENOTEMPTY not empty
484 static int mdd_dir_is_empty(const struct lu_env *env,
485 struct mdd_object *dir)
488 struct dt_object *obj;
489 struct dt_it_ops *iops;
493 obj = mdd_object_child(dir);
494 iops = &obj->do_index_ops->dio_it;
495 it = iops->init(env, obj, 0, BYPASS_CAPA);
497 result = iops->get(env, it, (const void *)"");
500 for (result = 0, i = 0; result == 0 && i < 3; ++i)
501 result = iops->next(env, it);
504 else if (result == +1)
506 } else if (result == 0)
508 * Huh? Index contains no zero key?
519 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
520 struct mdd_object *cobj, struct md_attr *ma)
522 struct dt_object *dt_cobj = mdd_object_child(cobj);
526 rc = mdd_may_delete(env, pobj, cobj,
527 S_ISDIR(ma->ma_attr.la_mode), 1);
531 if (S_ISDIR(mdd_object_type(cobj))) {
532 if (dt_try_as_dir(env, dt_cobj))
533 rc = mdd_dir_is_empty(env, cobj);
541 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
542 struct md_object *cobj, const char *name,
545 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
546 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
547 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
548 struct mdd_device *mdd = mdo2mdd(pobj);
549 struct dynlock_handle *dlh;
550 struct thandle *handle;
555 * Check -ENOENT early here because we need to get object type
556 * to calculate credits before transaction start
558 if (!lu_object_exists(&cobj->mo_lu)) {
559 LU_OBJECT_DEBUG(D_ERROR, env, &cobj->mo_lu,
560 "unlinking as `%s'", name);
564 LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is "DFID"\n",
565 PFID(lu_object_fid(&cobj->mo_lu)));
567 rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
571 handle = mdd_trans_start(env, mdd);
573 RETURN(PTR_ERR(handle));
575 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
577 GOTO(out_trans, rc = -ENOMEM);
578 mdd_write_lock(env, mdd_cobj);
580 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
584 is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
586 current->debugging1 |= 0x1; /* XXX enable lvar_enoent_debug
588 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
589 mdd_object_capa(env, mdd_pobj));
590 current->debugging1 &= ~0x1;
594 mdd_ref_del_internal(env, mdd_cobj, handle);
597 mdd_ref_del_internal(env, mdd_cobj, handle);
601 la->la_valid = LA_CTIME | LA_MTIME;
602 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
606 la->la_valid = LA_CTIME;
607 rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
611 rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
614 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
615 strlen("unlinked"), "unlinked", 0,
619 mdd_write_unlock(env, mdd_cobj);
620 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
622 mdd_trans_stop(env, mdd, rc, handle);
626 static int mdd_ni_sanity_check(const struct lu_env *env,
627 struct md_object *pobj,
629 const struct lu_fid *fid)
631 struct mdd_object *obj = md2mdd_obj(pobj);
635 if (mdd_is_dead_obj(obj))
638 /* The exist of the name will be checked in _index_insert. */
639 RETURN(mdd_permission_internal_locked(env, obj, NULL,
640 MAY_WRITE | MAY_EXEC));
646 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
647 const char *name, const struct lu_fid *fid,
650 struct lu_attr *la = &mdd_env_info(env)->mti_la;
651 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
652 struct mdd_device *mdd = mdo2mdd(pobj);
653 struct dynlock_handle *dlh;
654 struct thandle *handle;
658 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
659 handle = mdd_trans_start(env, mdo2mdd(pobj));
661 RETURN(PTR_ERR(handle));
663 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
665 GOTO(out_trans, rc = -ENOMEM);
666 rc = mdd_ni_sanity_check(env, pobj, name, fid);
668 GOTO(out_unlock, rc);
670 rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
671 handle, BYPASS_CAPA);
673 la->la_ctime = la->la_atime = CURRENT_SECONDS;
674 la->la_valid = LA_ATIME | LA_CTIME;
675 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
679 mdd_pdo_write_unlock(env, mdd_obj, dlh);
681 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
685 static int mdd_nr_sanity_check(const struct lu_env *env,
686 struct md_object *pobj,
689 struct mdd_object *obj = md2mdd_obj(pobj);
693 if (mdd_is_dead_obj(obj)) {
694 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
698 /* Name presense will be checked in _index_delete. */
699 RETURN(mdd_permission_internal_locked(env, obj, NULL,
700 MAY_WRITE | MAY_EXEC));
706 static int mdd_name_remove(const struct lu_env *env,
707 struct md_object *pobj,
708 const char *name, int is_dir)
710 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
711 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
712 struct mdd_device *mdd = mdo2mdd(pobj);
713 struct dynlock_handle *dlh;
714 struct thandle *handle;
718 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
719 handle = mdd_trans_start(env, mdd);
721 RETURN(PTR_ERR(handle));
723 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
725 GOTO(out_trans, rc = -ENOMEM);
726 rc = mdd_nr_sanity_check(env, pobj, name);
728 GOTO(out_unlock, rc);
730 rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
731 handle, BYPASS_CAPA);
733 GOTO(out_unlock, rc);
735 la->la_ctime = la->la_mtime = CURRENT_SECONDS;
736 la->la_valid = LA_CTIME | LA_MTIME;
737 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
740 mdd_pdo_write_unlock(env, mdd_obj, dlh);
742 mdd_trans_stop(env, mdd, rc, handle);
746 static int mdd_rt_sanity_check(const struct lu_env *env,
747 struct mdd_object *tgt_pobj,
748 struct mdd_object *tobj,
749 const struct lu_fid *sfid,
750 const char *name, struct md_attr *ma)
756 if (mdd_is_dead_obj(tgt_pobj))
759 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
761 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
762 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
763 mdd_dir_is_empty(env, tobj))
766 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
772 static int mdd_rename_tgt(const struct lu_env *env,
773 struct md_object *pobj, struct md_object *tobj,
774 const struct lu_fid *lf, const char *name,
777 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
778 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
779 struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
780 struct mdd_device *mdd = mdo2mdd(pobj);
781 struct dynlock_handle *dlh;
782 struct thandle *handle;
786 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
787 handle = mdd_trans_start(env, mdd);
789 RETURN(PTR_ERR(handle));
791 dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
793 GOTO(out_trans, rc = -ENOMEM);
795 mdd_write_lock(env, mdd_tobj);
797 /* XXX: Rename sanity checking. */
798 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
803 * If rename_tgt is called then we should just re-insert name with
804 * correct fid, no need to dec/inc parent nlink if obj is dir.
806 rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
810 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
816 la->la_valid = LA_CTIME | LA_MTIME;
817 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
821 if (tobj && lu_object_exists(&tobj->mo_lu)) {
822 mdd_ref_del_internal(env, mdd_tobj, handle);
823 la->la_valid = LA_CTIME;
824 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
829 mdd_write_unlock(env, mdd_tobj);
830 mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
832 mdd_trans_stop(env, mdd, rc, handle);
837 * The permission has been checked when obj created, no need check again.
839 static int mdd_cd_sanity_check(const struct lu_env *env,
840 struct mdd_object *obj)
845 if (!obj || mdd_is_dead_obj(obj))
852 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
853 struct md_object *cobj, const struct md_op_spec *spec,
856 struct mdd_device *mdd = mdo2mdd(cobj);
857 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
858 struct mdd_object *son = md2mdd_obj(cobj);
859 struct lu_attr *attr = &ma->ma_attr;
860 struct lov_mds_md *lmm = NULL;
862 struct thandle *handle;
866 rc = mdd_cd_sanity_check(env, son);
870 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
871 !(spec->sp_cr_flags & FMODE_WRITE))
874 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
879 mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
880 handle = mdd_trans_start(env, mdd);
882 GOTO(out_free, rc = PTR_ERR(handle));
885 * XXX: Setting the lov ea is not locked but setting the attr is locked?
886 * Should this be fixed?
889 /* Replay creates has objects already */
890 if (spec->u.sp_ea.no_lov_create) {
891 CDEBUG(D_INFO, "we already have lov ea\n");
892 rc = mdd_lov_set_md(env, mdd_pobj, son,
893 (struct lov_mds_md *)spec->u.sp_ea.eadata,
894 spec->u.sp_ea.eadatalen, handle, 0);
896 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
897 lmm_size, handle, 0);
900 rc = mdd_attr_get_internal_locked(env, son, ma);
902 mdd_trans_stop(env, mdd, rc, handle);
904 /* Finish mdd_lov_create() stuff. */
905 mdd_lov_create_finish(env, mdd, rc);
907 OBD_FREE(lmm, lmm_size);
912 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
913 const char *name, struct lu_fid* fid, int mask)
915 const struct dt_key *key = (const struct dt_key *)name;
916 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
917 struct dt_object *dir = mdd_object_child(mdd_obj);
918 struct dt_rec *rec = (struct dt_rec *)fid;
919 struct timeval start;
923 mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
924 if (mdd_is_dead_obj(mdd_obj))
927 rc = lu_object_exists(mdd2lu_obj(mdd_obj));
931 CERROR("Object "DFID" locates on remote server\n",
932 PFID(mdo2fid(mdd_obj)));
936 rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
940 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
941 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
942 mdd_object_capa(env, mdd_obj));
944 fid_be_to_cpu(fid, fid);
948 mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
952 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
953 struct mdd_object *child, struct md_attr *ma,
954 struct thandle *handle)
960 * Update attributes for child.
963 * (1) the valid bits should be converted between Lustre and Linux;
964 * (2) maybe, the child attributes should be set in OSD when creation.
967 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
971 if (S_ISDIR(ma->ma_attr.la_mode)) {
972 /* Add "." and ".." for newly created dir */
973 mdd_ref_add_internal(env, child, handle);
974 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
975 dot, handle, BYPASS_CAPA);
977 rc = __mdd_index_insert_only(env, child, pfid,
983 rc2 = __mdd_index_delete(env, child, dot, 0,
984 handle, BYPASS_CAPA);
986 CERROR("Failure to cleanup after dotdot"
987 " creation: %d (%d)\n", rc2, rc);
989 mdd_ref_del_internal(env, child, handle);
996 static int mdd_create_sanity_check(const struct lu_env *env,
997 struct md_object *pobj,
1002 struct mdd_thread_info *info = mdd_env_info(env);
1003 struct lu_attr *la = &info->mti_la;
1004 struct lu_fid *fid = &info->mti_fid;
1005 struct mdd_object *obj = md2mdd_obj(pobj);
1010 if (mdd_is_dead_obj(obj))
1014 * In some cases this lookup is not needed - we know before if name
1015 * exists or not because MDT performs lookup for it.
1017 /* XXX disable that lookup temporary */
1020 * Check if the name already exist, though it will be checked in
1021 * _index_insert also, for avoiding rolling back if exists
1024 rc = __mdd_lookup_locked(env, pobj, name, fid,
1025 MAY_WRITE | MAY_EXEC);
1027 RETURN(rc ? : -EEXIST);
1030 * Check if has WRITE permission for the parent.
1032 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1038 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1042 if (la->la_mode & S_ISGID) {
1043 ma->ma_attr.la_gid = la->la_gid;
1044 if (S_ISDIR(ma->ma_attr.la_mode)) {
1045 ma->ma_attr.la_mode |= S_ISGID;
1046 ma->ma_attr.la_valid |= LA_MODE;
1050 switch (ma->ma_attr.la_mode & S_IFMT) {
1052 struct mdd_device *mdd = mdo2mdd(pobj);
1053 if (la->la_nlink >= mdd->mdd_dt_conf.ddp_max_nlink)
1072 * Create object and insert it into namespace.
1074 static int mdd_create(const struct lu_env *env,
1075 struct md_object *pobj, const char *name,
1076 struct md_object *child,
1077 struct md_op_spec *spec,
1080 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1081 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1082 struct mdd_object *son = md2mdd_obj(child);
1083 struct mdd_device *mdd = mdo2mdd(pobj);
1084 struct lu_attr *attr = &ma->ma_attr;
1085 struct lov_mds_md *lmm = NULL;
1086 struct thandle *handle;
1087 int rc, created = 0, inserted = 0, lmm_size = 0;
1088 struct dynlock_handle *dlh;
1089 struct timeval start;
1092 mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
1095 * Two operations have to be performed:
1097 * - allocation of new object (->do_create()), and
1099 * - insertion into parent index (->dio_insert()).
1101 * Due to locking, operation order is not important, when both are
1102 * successful, *but* error handling cases are quite different:
1104 * - if insertion is done first, and following object creation fails,
1105 * insertion has to be rolled back, but this operation might fail
1106 * also leaving us with dangling index entry.
1108 * - if creation is done first, is has to be undone if insertion
1109 * fails, leaving us with leaked space, which is neither good, nor
1112 * It seems that creation-first is simplest solution, but it is
1113 * sub-optimal in the frequent
1118 * case, because second mkdir is bound to create object, only to
1119 * destroy it immediately.
1121 * To avoid this follow local file systems that do double lookup:
1123 * 0. lookup -> -EEXIST (mdd_create_sanity_check())
1125 * 1. create (mdd_object_create_internal())
1127 * 2. insert (__mdd_index_insert(), lookup again)
1130 /* Sanity checks before big job. */
1131 rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1136 * No RPC inside the transaction, so OST objects should be created at
1139 if (S_ISREG(attr->la_mode)) {
1140 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1146 mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1147 handle = mdd_trans_start(env, mdd);
1149 GOTO(out_free, rc = PTR_ERR(handle));
1151 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1153 GOTO(out_trans, rc = -ENOMEM);
1156 * XXX: Check that link can be added to the parent in mkdir case.
1159 mdd_write_lock(env, son);
1160 rc = mdd_object_create_internal(env, son, ma, handle);
1162 mdd_write_unlock(env, son);
1168 #ifdef CONFIG_FS_POSIX_ACL
1169 mdd_read_lock(env, mdd_pobj);
1170 rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1171 mdd_read_unlock(env, mdd_pobj);
1173 mdd_write_unlock(env, son);
1176 ma->ma_attr.la_valid |= LA_MODE;
1180 rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1182 mdd_write_unlock(env, son);
1185 * Object has no links, so it will be destroyed when last
1186 * reference is released. (XXX not now.)
1190 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1191 name, S_ISDIR(attr->la_mode), handle,
1192 mdd_object_capa(env, mdd_pobj));
1199 /* Replay creates has objects already. */
1200 if (spec->u.sp_ea.no_lov_create) {
1201 CDEBUG(D_INFO, "we already have lov ea\n");
1202 LASSERT(lmm == NULL);
1203 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1204 lmm_size = spec->u.sp_ea.eadatalen;
1206 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1208 CERROR("error on stripe info copy %d \n", rc);
1211 if (lmm && lmm_size > 0) {
1212 /* Set Lov here, do not get lmm again later */
1213 memcpy(ma->ma_lmm, lmm, lmm_size);
1214 ma->ma_lmm_size = lmm_size;
1215 ma->ma_valid |= MA_LOV;
1218 if (S_ISLNK(attr->la_mode)) {
1219 struct dt_object *dt = mdd_object_child(son);
1220 const char *target_name = spec->u.sp_symname;
1221 int sym_len = strlen(target_name);
1222 const struct lu_buf *buf;
1225 buf = mdd_buf_get_const(env, target_name, sym_len);
1226 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1227 mdd_object_capa(env, son));
1232 GOTO(cleanup, rc = -EFAULT);
1236 la->la_valid = LA_CTIME | LA_MTIME;
1237 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1241 /* Return attr back. */
1242 rc = mdd_attr_get_internal_locked(env, son, ma);
1245 if (rc && created) {
1249 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1250 S_ISDIR(attr->la_mode),
1251 handle, BYPASS_CAPA);
1253 CERROR("error can not cleanup destroy %d\n",
1257 mdd_write_lock(env, son);
1258 mdd_ref_del_internal(env, son, handle);
1259 mdd_write_unlock(env, son);
1263 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1265 mdd_trans_stop(env, mdd, rc, handle);
1267 if (lmm && !spec->u.sp_ea.no_lov_create)
1268 OBD_FREE(lmm, lmm_size);
1269 /* Finish mdd_lov_create() stuff */
1270 mdd_lov_create_finish(env, mdd, rc);
1271 mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
1276 * Get locks on parents in proper order
1277 * RETURN: < 0 - error, rename_order if successful
1285 static int mdd_rename_order(const struct lu_env *env,
1286 struct mdd_device *mdd,
1287 struct mdd_object *src_pobj,
1288 struct mdd_object *tgt_pobj)
1290 /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1294 if (src_pobj == tgt_pobj)
1295 RETURN(MDD_RN_SAME);
1297 /* compared the parent child relationship of src_p&tgt_p */
1298 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1300 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1303 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1316 static int mdd_rename_sanity_check(const struct lu_env *env,
1317 struct mdd_object *src_pobj,
1318 struct mdd_object *tgt_pobj,
1319 const struct lu_fid *sfid,
1321 struct mdd_object *tobj)
1326 if (mdd_is_dead_obj(src_pobj))
1329 /* The sobj maybe on the remote, check parent permission only here */
1330 rc = mdd_permission_internal_locked(env, src_pobj, NULL,
1331 MAY_WRITE | MAY_EXEC);
1336 rc = mdd_may_create(env, tgt_pobj, NULL,
1337 (src_pobj != tgt_pobj));
1339 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1340 (src_pobj != tgt_pobj));
1342 if (S_ISDIR(mdd_object_type(tobj))
1343 && mdd_dir_is_empty(env, tobj))
1349 /* src object can be remote that is why we use only fid and type of object */
1350 static int mdd_rename(const struct lu_env *env,
1351 struct md_object *src_pobj, struct md_object *tgt_pobj,
1352 const struct lu_fid *lf, const char *sname,
1353 struct md_object *tobj, const char *tname,
1356 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1357 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1358 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1359 struct mdd_device *mdd = mdo2mdd(src_pobj);
1360 struct mdd_object *mdd_sobj = NULL;
1361 struct mdd_object *mdd_tobj = NULL;
1362 struct dynlock_handle *sdlh, *tdlh;
1363 struct thandle *handle;
1368 LASSERT(ma->ma_attr.la_mode & S_IFMT);
1369 is_dir = S_ISDIR(ma->ma_attr.la_mode);
1370 if (ma->ma_attr.la_valid & LA_FLAGS &&
1371 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1375 mdd_tobj = md2mdd_obj(tobj);
1377 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1378 handle = mdd_trans_start(env, mdd);
1380 RETURN(PTR_ERR(handle));
1382 /* FIXME: Should consider tobj and sobj too in rename_lock. */
1383 rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1385 GOTO(cleanup_unlocked, rc);
1387 /* Get locks in determined order */
1388 if (rc == MDD_RN_SAME) {
1389 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1390 /* check hashes to determine do we need one lock or two */
1391 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1392 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1395 } else if (rc == MDD_RN_SRCTGT) {
1396 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1397 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1399 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1400 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1402 if (sdlh == NULL || tdlh == NULL)
1403 GOTO(cleanup, rc = -ENOMEM);
1405 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1406 lf, is_dir, mdd_tobj);
1410 rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1411 mdd_object_capa(env, mdd_spobj));
1416 * Here tobj can be remote one, so we do index_delete unconditionally
1417 * and -ENOENT is allowed.
1419 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1420 mdd_object_capa(env, mdd_tpobj));
1421 if (rc != 0 && rc != -ENOENT)
1424 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1425 mdd_object_capa(env, mdd_tpobj));
1430 mdd_sobj = mdd_object_find(env, mdd, lf);
1432 la->la_valid = LA_CTIME;
1434 /* XXX: How to update ctime for remote sobj? */
1435 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1439 if (tobj && lu_object_exists(&tobj->mo_lu)) {
1440 mdd_write_lock(env, mdd_tobj);
1441 mdd_ref_del_internal(env, mdd_tobj, handle);
1443 /* Remove dot reference. */
1445 mdd_ref_del_internal(env, mdd_tobj, handle);
1447 la->la_valid = LA_CTIME;
1448 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1452 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1453 mdd_write_unlock(env, mdd_tobj);
1458 la->la_valid = LA_CTIME | LA_MTIME;
1459 rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1463 if (mdd_spobj != mdd_tpobj) {
1464 la->la_valid = LA_CTIME | LA_MTIME;
1465 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1471 if (likely(tdlh) && sdlh != tdlh)
1472 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1474 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1476 mdd_trans_stop(env, mdd, rc, handle);
1478 mdd_object_put(env, mdd_sobj);
1482 struct md_dir_operations mdd_dir_ops = {
1483 .mdo_is_subdir = mdd_is_subdir,
1484 .mdo_lookup = mdd_lookup,
1485 .mdo_create = mdd_create,
1486 .mdo_rename = mdd_rename,
1487 .mdo_link = mdd_link,
1488 .mdo_unlink = mdd_unlink,
1489 .mdo_name_insert = mdd_name_insert,
1490 .mdo_name_remove = mdd_name_remove,
1491 .mdo_rename_tgt = mdd_rename_tgt,
1492 .mdo_create_data = mdd_create_data