1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
46 #include "mdd_internal.h"
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52 const char *name, struct lu_fid* fid, int mask);
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55 const char *name, struct lu_fid* fid, int mask)
57 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58 struct dynlock_handle *dlh;
61 dlh = mdd_pdo_read_lock(env, mdd_obj, name);
64 rc = __mdd_lookup(env, pobj, name, fid, mask);
65 mdd_pdo_read_unlock(env, mdd_obj, dlh);
70 static int mdd_lookup(const struct lu_env *env,
71 struct md_object *pobj, const char *name,
76 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
84 return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
88 * For root fid use special function, whcih does not compare version component
89 * of fid. Vresion component is different for root fids on all MDTs.
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
93 return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
98 * return 1: if lf is the fid of the ancestor of p1;
101 * return -EREMOTE: if remote object is found, in this
102 * case fid of remote object is saved to @pf;
104 * otherwise: values < 0, errors.
106 static int mdd_is_parent(const struct lu_env *env,
107 struct mdd_device *mdd,
108 struct mdd_object *p1,
109 const struct lu_fid *lf,
112 struct mdd_object *parent = NULL;
117 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118 pfid = &mdd_env_info(env)->mti_fid;
120 /* Check for root first. */
121 if (mdd_is_root(mdd, mdo2fid(p1)))
125 rc = mdd_parent_fid(env, p1, pfid);
128 if (mdd_is_root(mdd, pfid))
130 if (lu_fid_eq(pfid, lf))
133 mdd_object_put(env, parent);
134 parent = mdd_object_find(env, mdd, pfid);
136 /* cross-ref parent */
137 if (parent == NULL) {
140 GOTO(out, rc = -EREMOTE);
141 } else if (IS_ERR(parent))
142 GOTO(out, rc = PTR_ERR(parent));
147 if (parent && !IS_ERR(parent))
148 mdd_object_put(env, parent);
153 * No permission check is needed.
155 * returns 1: if fid is ancestor of @mo;
156 * returns 0: if fid is not a ancestor of @mo;
158 * returns EREMOTE if remote object is found, fid of remote object is saved to
161 * returns < 0: if error
163 static int mdd_is_subdir(const struct lu_env *env,
164 struct md_object *mo, const struct lu_fid *fid,
167 struct mdd_device *mdd = mdo2mdd(mo);
171 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
174 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
178 } else if (rc == 1) {
179 /* found @fid is parent */
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188 struct mdd_object *cobj, int need_check)
193 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
196 if (mdd_is_dead_obj(pobj))
200 rc = mdd_permission_internal(env, pobj, NULL,
201 MAY_WRITE | MAY_EXEC, 1);
207 * It's inline, so penalty for filesystems that don't use sticky bit is
210 static inline int mdd_is_sticky(const struct lu_env *env,
211 struct mdd_object *pobj,
212 struct mdd_object *cobj)
214 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
215 struct md_ucred *uc = md_ucred(env);
218 rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
221 } else if (tmp_la->la_uid == uc->mu_fsuid) {
224 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
227 else if (!(tmp_la->la_mode & S_ISVTX))
229 else if (tmp_la->la_uid == uc->mu_fsuid)
232 return !mdd_capable(uc, CAP_FOWNER);
236 /* Check whether it may delete the cobj under the pobj. */
237 static int mdd_may_delete(const struct lu_env *env,
238 struct mdd_object *pobj,
239 struct mdd_object *cobj,
240 int is_dir, int need_check)
242 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
248 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
251 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
255 if (!S_ISDIR(mdd_object_type(cobj)))
258 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
261 } else if (S_ISDIR(mdd_object_type(cobj))) {
266 if (mdd_is_dead_obj(pobj))
269 if (mdd_is_sticky(env, pobj, cobj))
273 rc = mdd_permission_internal(env, pobj, NULL,
274 MAY_WRITE | MAY_EXEC, 1);
279 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
280 struct mdd_object *src_obj)
285 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
288 if (S_ISDIR(mdd_object_type(src_obj)))
291 LASSERT(src_obj != tgt_obj);
293 rc = mdd_may_create(env, tgt_obj, NULL, 1);
301 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
302 const struct lu_fid *fid)
304 struct mdd_thread_info *info = mdd_env_info(env);
306 fid_cpu_to_be(&info->mti_fid2, fid);
307 return (const struct dt_rec *)&info->mti_fid2;
311 /* insert new index, add reference if isdir, update times */
312 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
313 const struct lu_fid *lf, const char *name, int is_dir,
314 struct thandle *handle, struct lustre_capa *capa)
316 struct dt_object *next = mdd_object_child(pobj);
317 struct timeval start;
321 mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
322 LPROC_MDD_INDEX_INSERT);
323 if (dt_try_as_dir(env, next)) {
324 rc = next->do_index_ops->dio_insert(env, next,
325 __mdd_fid_rec(env, lf),
326 (const struct dt_key *)name,
334 mdd_write_lock(env, pobj);
335 mdd_ref_add_internal(env, pobj, handle);
336 mdd_write_unlock(env, pobj);
339 mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
340 LPROC_MDD_INDEX_INSERT);
344 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
345 const char *name, int is_dir, struct thandle *handle,
346 struct lustre_capa *capa)
348 struct dt_object *next = mdd_object_child(pobj);
349 struct timeval start;
353 mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
354 LPROC_MDD_INDEX_DELETE);
356 if (dt_try_as_dir(env, next)) {
357 rc = next->do_index_ops->dio_delete(env, next,
358 (struct dt_key *)name,
360 if (rc == 0 && is_dir) {
361 mdd_write_lock(env, pobj);
362 mdd_ref_del_internal(env, pobj, handle);
363 mdd_write_unlock(env, pobj);
368 mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
369 LPROC_MDD_INDEX_DELETE);
374 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
375 const struct lu_fid *lf, const char *name,
376 struct thandle *handle, struct lustre_capa *capa)
378 struct dt_object *next = mdd_object_child(pobj);
382 if (dt_try_as_dir(env, next)) {
383 rc = next->do_index_ops->dio_insert(env, next,
384 __mdd_fid_rec(env, lf),
385 (const struct dt_key *)name,
393 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
394 struct md_object *src_obj, const char *name,
397 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
398 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
399 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
400 struct mdd_device *mdd = mdo2mdd(src_obj);
401 struct dynlock_handle *dlh;
402 struct thandle *handle;
406 mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
407 handle = mdd_trans_start(env, mdd);
409 RETURN(PTR_ERR(handle));
411 dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
413 GOTO(out_trans, rc = -ENOMEM);
414 mdd_write_lock(env, mdd_sobj);
416 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
418 GOTO(out_unlock, rc);
420 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
422 mdd_object_capa(env, mdd_tobj));
424 GOTO(out_unlock, rc);
426 mdd_ref_add_internal(env, mdd_sobj, handle);
429 la->la_valid = LA_CTIME | LA_MTIME;
430 rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
432 GOTO(out_unlock, rc);
434 la->la_valid = LA_CTIME;
435 rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
438 mdd_write_unlock(env, mdd_sobj);
439 mdd_pdo_write_unlock(env, mdd_tobj, dlh);
441 mdd_trans_stop(env, mdd, rc, handle);
445 /* caller should take a lock before calling */
446 int mdd_finish_unlink(const struct lu_env *env,
447 struct mdd_object *obj, struct md_attr *ma,
453 rc = mdd_iattr_get(env, obj, ma);
454 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
455 /* add new orphan and the object
456 * will be deleted during the object_put() */
457 if (__mdd_orphan_add(env, obj, th) == 0)
458 obj->mod_flags |= ORPHAN_OBJ;
460 if (obj->mod_count == 0)
461 rc = mdd_object_kill(env, obj, ma);
463 /* clear MA_LOV | MA_COOKIE, if we do not
464 * unlink it in case we get it somewhere */
465 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
467 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
473 * Check that @dir contains no entries except (possibly) dot and dotdot.
478 * -ENOTEMPTY not empty
482 static int mdd_dir_is_empty(const struct lu_env *env,
483 struct mdd_object *dir)
486 struct dt_object *obj;
487 struct dt_it_ops *iops;
491 obj = mdd_object_child(dir);
492 iops = &obj->do_index_ops->dio_it;
493 it = iops->init(env, obj, 0, BYPASS_CAPA);
495 result = iops->get(env, it, (const void *)"");
498 for (result = 0, i = 0; result == 0 && i < 3; ++i)
499 result = iops->next(env, it);
502 else if (result == +1)
504 } else if (result == 0)
506 * Huh? Index contains no zero key?
517 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
518 struct mdd_object *cobj, struct md_attr *ma)
520 struct dt_object *dt_cobj = mdd_object_child(cobj);
524 rc = mdd_may_delete(env, pobj, cobj,
525 S_ISDIR(ma->ma_attr.la_mode), 1);
529 if (S_ISDIR(mdd_object_type(cobj))) {
530 if (dt_try_as_dir(env, dt_cobj))
531 rc = mdd_dir_is_empty(env, cobj);
539 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
540 struct md_object *cobj, const char *name,
543 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
544 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
545 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
546 struct mdd_device *mdd = mdo2mdd(pobj);
547 struct dynlock_handle *dlh;
548 struct thandle *handle;
552 rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
556 handle = mdd_trans_start(env, mdd);
558 RETURN(PTR_ERR(handle));
560 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
562 GOTO(out_trans, rc = -ENOMEM);
563 mdd_write_lock(env, mdd_cobj);
565 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
569 is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
570 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
571 mdd_object_capa(env, mdd_pobj));
575 mdd_ref_del_internal(env, mdd_cobj, handle);
578 mdd_ref_del_internal(env, mdd_cobj, handle);
582 la->la_valid = LA_CTIME | LA_MTIME;
583 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
587 la->la_valid = LA_CTIME;
588 rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
592 rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
595 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
596 strlen("unlinked"), "unlinked", 0,
600 mdd_write_unlock(env, mdd_cobj);
601 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
603 mdd_trans_stop(env, mdd, rc, handle);
607 static int mdd_ni_sanity_check(const struct lu_env *env,
608 struct md_object *pobj,
610 const struct lu_fid *fid)
612 struct mdd_object *obj = md2mdd_obj(pobj);
616 if (mdd_is_dead_obj(obj))
619 /* The exist of the name will be checked in _index_insert. */
620 RETURN(mdd_permission_internal(env, obj, NULL,
621 MAY_WRITE | MAY_EXEC, 1));
627 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
628 const char *name, const struct lu_fid *fid,
631 struct lu_attr *la = &mdd_env_info(env)->mti_la;
632 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
633 struct mdd_device *mdd = mdo2mdd(pobj);
634 struct dynlock_handle *dlh;
635 struct thandle *handle;
639 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
640 handle = mdd_trans_start(env, mdo2mdd(pobj));
642 RETURN(PTR_ERR(handle));
644 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
646 GOTO(out_trans, rc = -ENOMEM);
647 rc = mdd_ni_sanity_check(env, pobj, name, fid);
649 GOTO(out_unlock, rc);
651 rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
652 handle, BYPASS_CAPA);
654 la->la_ctime = la->la_atime = CURRENT_SECONDS;
655 la->la_valid = LA_ATIME | LA_CTIME;
656 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
659 mdd_pdo_write_unlock(env, mdd_obj, dlh);
661 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
665 static int mdd_nr_sanity_check(const struct lu_env *env,
666 struct md_object *pobj,
669 struct mdd_object *obj = md2mdd_obj(pobj);
674 if (mdd_is_dead_obj(obj)) {
675 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
679 /* Name presense will be checked in _index_delete. */
680 rc = mdd_permission_internal(env, obj, NULL, MAY_WRITE | MAY_EXEC, 1);
687 static int mdd_name_remove(const struct lu_env *env,
688 struct md_object *pobj,
689 const char *name, int is_dir)
691 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
692 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
693 struct mdd_device *mdd = mdo2mdd(pobj);
694 struct dynlock_handle *dlh;
695 struct thandle *handle;
699 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
700 handle = mdd_trans_start(env, mdd);
702 RETURN(PTR_ERR(handle));
704 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
706 GOTO(out_trans, rc = -ENOMEM);
707 rc = mdd_nr_sanity_check(env, pobj, name);
709 GOTO(out_unlock, rc);
711 rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
712 handle, BYPASS_CAPA);
714 GOTO(out_unlock, rc);
716 la->la_ctime = la->la_mtime = CURRENT_SECONDS;
717 la->la_valid = LA_CTIME | LA_MTIME;
718 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
721 mdd_pdo_write_unlock(env, mdd_obj, dlh);
723 mdd_trans_stop(env, mdd, rc, handle);
727 static int mdd_rt_sanity_check(const struct lu_env *env,
728 struct mdd_object *tgt_pobj,
729 struct mdd_object *tobj,
730 const struct lu_fid *sfid,
731 const char *name, struct md_attr *ma)
737 if (mdd_is_dead_obj(tgt_pobj))
740 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
742 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
743 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
744 mdd_dir_is_empty(env, tobj))
747 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
753 static int mdd_rename_tgt(const struct lu_env *env,
754 struct md_object *pobj, struct md_object *tobj,
755 const struct lu_fid *lf, const char *name,
758 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
759 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
760 struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
761 struct mdd_device *mdd = mdo2mdd(pobj);
762 struct dynlock_handle *dlh;
763 struct thandle *handle;
767 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
768 handle = mdd_trans_start(env, mdd);
770 RETURN(PTR_ERR(handle));
772 dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
774 GOTO(out_trans, rc = -ENOMEM);
776 mdd_write_lock(env, mdd_tobj);
778 /* XXX: Rename sanity checking. */
779 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
784 * If rename_tgt is called then we should just re-insert name with
785 * correct fid, no need to dec/inc parent nlink if obj is dir.
787 rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
791 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
797 la->la_valid = LA_CTIME | LA_MTIME;
798 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
802 if (tobj && lu_object_exists(&tobj->mo_lu)) {
803 mdd_ref_del_internal(env, mdd_tobj, handle);
804 la->la_valid = LA_CTIME;
805 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
810 mdd_write_unlock(env, mdd_tobj);
811 mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
813 mdd_trans_stop(env, mdd, rc, handle);
818 * The permission has been checked when obj created, no need check again.
820 static int mdd_cd_sanity_check(const struct lu_env *env,
821 struct mdd_object *obj)
826 if (!obj || mdd_is_dead_obj(obj))
833 static int mdd_create_data(const struct lu_env *env,
834 struct md_object *pobj, struct md_object *cobj,
835 const struct md_create_spec *spec,
838 struct mdd_device *mdd = mdo2mdd(cobj);
839 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
840 struct mdd_object *son = md2mdd_obj(cobj);
841 struct lu_attr *attr = &ma->ma_attr;
842 struct lov_mds_md *lmm = NULL;
844 struct thandle *handle;
848 rc = mdd_cd_sanity_check(env, son);
852 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
853 !(spec->sp_cr_flags & FMODE_WRITE))
855 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
860 mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
861 handle = mdd_trans_start(env, mdd);
863 RETURN(rc = PTR_ERR(handle));
866 * XXX: Setting the lov ea is not locked but setting the attr is locked?
867 * Should this be fixed?
870 /* Replay creates has objects already */
871 if (spec->u.sp_ea.no_lov_create) {
872 CDEBUG(D_INFO, "we already have lov ea\n");
873 rc = mdd_lov_set_md(env, mdd_pobj, son,
874 (struct lov_mds_md *)spec->u.sp_ea.eadata,
875 spec->u.sp_ea.eadatalen, handle, 0);
877 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
878 lmm_size, handle, 0);
881 rc = mdd_attr_get_internal_locked(env, son, ma);
883 /* Finish mdd_lov_create() stuff. */
884 mdd_lov_create_finish(env, mdd, rc);
885 mdd_trans_stop(env, mdd, rc, handle);
887 OBD_FREE(lmm, lmm_size);
892 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
893 const char *name, struct lu_fid* fid, int mask)
895 const struct dt_key *key = (const struct dt_key *)name;
896 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
897 struct dt_object *dir = mdd_object_child(mdd_obj);
898 struct dt_rec *rec = (struct dt_rec *)fid;
899 struct timeval start;
903 mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
904 if (mdd_is_dead_obj(mdd_obj))
907 rc = lu_object_exists(mdd2lu_obj(mdd_obj));
911 CERROR("Object "DFID" locates on remote server\n",
912 PFID(mdo2fid(mdd_obj)));
916 rc = mdd_permission_internal(env, mdd_obj, NULL, mask, 1);
920 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
921 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
922 mdd_object_capa(env, mdd_obj));
924 fid_be_to_cpu(fid, fid);
928 mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
932 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
933 struct mdd_object *child, struct md_attr *ma,
934 struct thandle *handle)
940 * Update attributes for child.
943 * (1) the valid bits should be converted between Lustre and Linux;
944 * (2) maybe, the child attributes should be set in OSD when creation.
947 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
951 if (S_ISDIR(ma->ma_attr.la_mode)) {
952 /* Add "." and ".." for newly created dir */
953 mdd_ref_add_internal(env, child, handle);
954 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
955 dot, handle, BYPASS_CAPA);
957 rc = __mdd_index_insert_only(env, child, pfid,
963 rc2 = __mdd_index_delete(env, child, dot, 0,
964 handle, BYPASS_CAPA);
966 CERROR("Failure to cleanup after dotdot"
967 " creation: %d (%d)\n", rc2, rc);
969 mdd_ref_del_internal(env, child, handle);
976 static int mdd_create_sanity_check(const struct lu_env *env,
977 struct md_object *pobj,
982 struct mdd_thread_info *info = mdd_env_info(env);
983 struct lu_attr *la = &info->mti_la;
984 struct lu_fid *fid = &info->mti_fid;
985 struct mdd_object *obj = md2mdd_obj(pobj);
990 if (mdd_is_dead_obj(obj))
994 * In some cases this lookup is not needed - we know before that if name
999 * Check if the name already exist, though it will be checked in
1000 * _index_insert also, for avoiding rolling back if exists
1003 rc = __mdd_lookup_locked(env, pobj, name, fid,
1004 MAY_WRITE | MAY_EXEC);
1006 RETURN(rc ? : -EEXIST);
1009 * Check if has WRITE permission for the parent.
1011 rc = mdd_permission_internal(env, obj, NULL, MAY_WRITE, 1);
1017 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1021 if (la->la_mode & S_ISGID) {
1022 ma->ma_attr.la_gid = la->la_gid;
1023 if (S_ISDIR(ma->ma_attr.la_mode)) {
1024 ma->ma_attr.la_mode |= S_ISGID;
1025 ma->ma_attr.la_valid |= LA_MODE;
1029 switch (ma->ma_attr.la_mode & S_IFMT) {
1047 * Create object and insert it into namespace.
1049 static int mdd_create(const struct lu_env *env,
1050 struct md_object *pobj, const char *name,
1051 struct md_object *child,
1052 struct md_create_spec *spec,
1055 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1056 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1057 struct mdd_object *son = md2mdd_obj(child);
1058 struct mdd_device *mdd = mdo2mdd(pobj);
1059 struct lu_attr *attr = &ma->ma_attr;
1060 struct lov_mds_md *lmm = NULL;
1061 struct thandle *handle;
1062 int rc, created = 0, inserted = 0, lmm_size = 0;
1063 struct dynlock_handle *dlh;
1064 struct timeval start;
1067 mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
1070 * Two operations have to be performed:
1072 * - allocation of new object (->do_create()), and
1074 * - insertion into parent index (->dio_insert()).
1076 * Due to locking, operation order is not important, when both are
1077 * successful, *but* error handling cases are quite different:
1079 * - if insertion is done first, and following object creation fails,
1080 * insertion has to be rolled back, but this operation might fail
1081 * also leaving us with dangling index entry.
1083 * - if creation is done first, is has to be undone if insertion
1084 * fails, leaving us with leaked space, which is neither good, nor
1087 * It seems that creation-first is simplest solution, but it is
1088 * sub-optimal in the frequent
1093 * case, because second mkdir is bound to create object, only to
1094 * destroy it immediately.
1096 * To avoid this follow local file systems that do double lookup:
1098 * 0. lookup -> -EEXIST (mdd_create_sanity_check())
1100 * 1. create (mdd_object_create_internal())
1102 * 2. insert (__mdd_index_insert(), lookup again)
1105 /* Sanity checks before big job. */
1106 rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1111 * No RPC inside the transaction, so OST objects should be created at
1114 if (S_ISREG(attr->la_mode)) {
1115 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1121 mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1122 handle = mdd_trans_start(env, mdd);
1124 GOTO(out_free, rc = PTR_ERR(handle));
1126 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1128 GOTO(out_trans, rc = -ENOMEM);
1131 * XXX: Check that link can be added to the parent in mkdir case.
1134 mdd_write_lock(env, son);
1135 rc = mdd_object_create_internal(env, son, ma, handle);
1137 mdd_write_unlock(env, son);
1143 #ifdef CONFIG_FS_POSIX_ACL
1144 mdd_read_lock(env, mdd_pobj);
1145 rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1146 mdd_read_unlock(env, mdd_pobj);
1148 mdd_write_unlock(env, son);
1151 ma->ma_attr.la_valid |= LA_MODE;
1155 rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1157 mdd_write_unlock(env, son);
1160 * Object has no links, so it will be destroyed when last
1161 * reference is released. (XXX not now.)
1165 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1166 name, S_ISDIR(attr->la_mode), handle,
1167 mdd_object_capa(env, mdd_pobj));
1174 /* Replay creates has objects already. */
1175 if (spec->u.sp_ea.no_lov_create) {
1176 CDEBUG(D_INFO, "we already have lov ea\n");
1177 LASSERT(lmm == NULL);
1178 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1179 lmm_size = spec->u.sp_ea.eadatalen;
1181 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1183 CERROR("error on stripe info copy %d \n", rc);
1186 if (lmm && lmm_size > 0) {
1187 /* Set Lov here, do not get lmm again later */
1188 memcpy(ma->ma_lmm, lmm, lmm_size);
1189 ma->ma_lmm_size = lmm_size;
1190 ma->ma_valid |= MA_LOV;
1193 if (S_ISLNK(attr->la_mode)) {
1194 struct dt_object *dt = mdd_object_child(son);
1195 const char *target_name = spec->u.sp_symname;
1196 int sym_len = strlen(target_name);
1197 const struct lu_buf *buf;
1200 buf = mdd_buf_get_const(env, target_name, sym_len);
1201 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1202 mdd_object_capa(env, son));
1207 GOTO(cleanup, rc = -EFAULT);
1211 la->la_valid = LA_CTIME | LA_MTIME;
1212 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1216 /* Return attr back. */
1217 rc = mdd_attr_get_internal_locked(env, son, ma);
1220 if (rc && created) {
1224 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1225 S_ISDIR(attr->la_mode),
1226 handle, BYPASS_CAPA);
1228 CERROR("error can not cleanup destroy %d\n",
1232 mdd_write_lock(env, son);
1233 mdd_ref_del_internal(env, son, handle);
1234 mdd_write_unlock(env, son);
1238 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1240 mdd_trans_stop(env, mdd, rc, handle);
1242 if (lmm && !spec->u.sp_ea.no_lov_create)
1243 OBD_FREE(lmm, lmm_size);
1244 /* Finish mdd_lov_create() stuff */
1245 mdd_lov_create_finish(env, mdd, rc);
1246 mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
1251 * Get locks on parents in proper order
1252 * RETURN: < 0 - error, rename_order if successful
1260 static int mdd_rename_order(const struct lu_env *env,
1261 struct mdd_device *mdd,
1262 struct mdd_object *src_pobj,
1263 struct mdd_object *tgt_pobj)
1265 /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1269 if (src_pobj == tgt_pobj)
1270 RETURN(MDD_RN_SAME);
1272 /* compared the parent child relationship of src_p&tgt_p */
1273 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1275 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1278 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1291 static int mdd_rename_sanity_check(const struct lu_env *env,
1292 struct mdd_object *src_pobj,
1293 struct mdd_object *tgt_pobj,
1294 const struct lu_fid *sfid,
1296 struct mdd_object *tobj)
1301 if (mdd_is_dead_obj(src_pobj))
1304 /* The sobj maybe on the remote, check parent permission only here */
1305 rc = mdd_permission_internal(env, src_pobj, NULL,
1306 MAY_WRITE | MAY_EXEC, 1);
1311 rc = mdd_may_create(env, tgt_pobj, NULL,
1312 (src_pobj != tgt_pobj));
1314 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1315 (src_pobj != tgt_pobj));
1317 if (S_ISDIR(mdd_object_type(tobj))
1318 && mdd_dir_is_empty(env, tobj))
1324 /* src object can be remote that is why we use only fid and type of object */
1325 static int mdd_rename(const struct lu_env *env,
1326 struct md_object *src_pobj, struct md_object *tgt_pobj,
1327 const struct lu_fid *lf, const char *sname,
1328 struct md_object *tobj, const char *tname,
1331 struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
1332 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1333 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1334 struct mdd_device *mdd = mdo2mdd(src_pobj);
1335 struct mdd_object *mdd_sobj = NULL;
1336 struct mdd_object *mdd_tobj = NULL;
1337 struct dynlock_handle *sdlh, *tdlh;
1338 struct thandle *handle;
1343 LASSERT(ma->ma_attr.la_mode & S_IFMT);
1344 is_dir = S_ISDIR(ma->ma_attr.la_mode);
1345 if (ma->ma_attr.la_valid & LA_FLAGS &&
1346 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1350 mdd_tobj = md2mdd_obj(tobj);
1352 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1353 handle = mdd_trans_start(env, mdd);
1355 RETURN(PTR_ERR(handle));
1357 /* FIXME: Should consider tobj and sobj too in rename_lock. */
1358 rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1360 GOTO(cleanup_unlocked, rc);
1362 /* Get locks in determined order */
1363 if (rc == MDD_RN_SAME) {
1364 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1365 /* check hashes to determine do we need one lock or two */
1366 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1367 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1370 } else if (rc == MDD_RN_SRCTGT) {
1371 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1372 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1374 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1375 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1377 if (sdlh == NULL || tdlh == NULL)
1378 GOTO(cleanup, rc = -ENOMEM);
1380 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1381 lf, is_dir, mdd_tobj);
1385 rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1386 mdd_object_capa(env, mdd_spobj));
1391 * Here tobj can be remote one, so we do index_delete unconditionally
1392 * and -ENOENT is allowed.
1394 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1395 mdd_object_capa(env, mdd_tpobj));
1396 if (rc != 0 && rc != -ENOENT)
1399 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1400 mdd_object_capa(env, mdd_tpobj));
1405 mdd_sobj = mdd_object_find(env, mdd, lf);
1407 la->la_valid = LA_CTIME;
1409 /* XXX: How to update ctime for remote sobj? */
1410 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1414 if (tobj && lu_object_exists(&tobj->mo_lu)) {
1415 mdd_write_lock(env, mdd_tobj);
1416 mdd_ref_del_internal(env, mdd_tobj, handle);
1418 /* Remove dot reference. */
1420 mdd_ref_del_internal(env, mdd_tobj, handle);
1422 la->la_valid = LA_CTIME;
1423 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1427 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1428 mdd_write_unlock(env, mdd_tobj);
1433 la->la_valid = LA_CTIME | LA_MTIME;
1434 rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1438 if (mdd_spobj != mdd_tpobj) {
1439 la->la_valid = LA_CTIME | LA_MTIME;
1440 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1446 if (likely(tdlh) && sdlh != tdlh)
1447 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1449 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1451 mdd_trans_stop(env, mdd, rc, handle);
1453 mdd_object_put(env, mdd_sobj);
1457 struct md_dir_operations mdd_dir_ops = {
1458 .mdo_is_subdir = mdd_is_subdir,
1459 .mdo_lookup = mdd_lookup,
1460 .mdo_create = mdd_create,
1461 .mdo_rename = mdd_rename,
1462 .mdo_link = mdd_link,
1463 .mdo_unlink = mdd_unlink,
1464 .mdo_name_insert = mdd_name_insert,
1465 .mdo_name_remove = mdd_name_remove,
1466 .mdo_rename_tgt = mdd_rename_tgt,
1467 .mdo_create_data = mdd_create_data