1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
45 #include "mdd_internal.h"
47 static const char dot[] = ".";
48 static const char dotdot[] = "..";
50 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
51 const char *name, const struct lu_fid* fid, int mask);
53 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
54 const char *name, const struct lu_fid* fid, int mask)
56 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
59 mdd_read_lock(env, mdd_obj);
60 rc = __mdd_lookup(env, pobj, name, fid, mask);
61 mdd_read_unlock(env, mdd_obj);
66 static int mdd_lookup(const struct lu_env *env,
67 struct md_object *pobj, const char *name,
72 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
77 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
80 return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
84 * return 1: if lf is the fid of the ancestor of p1;
87 * return -EREMOTE: if remote object is found, in this
88 * case fid of remote object is saved to @pf;
90 * otherwise: values < 0, errors.
92 static int mdd_is_parent(const struct lu_env *env,
93 struct mdd_device *mdd,
94 struct mdd_object *p1,
95 const struct lu_fid *lf,
98 struct mdd_object *parent = NULL;
103 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
104 pfid = &mdd_env_info(env)->mti_fid;
106 /* Do not lookup ".." in root, they do not exist there. */
107 if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
111 rc = mdd_parent_fid(env, p1, pfid);
114 if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
116 if (lu_fid_eq(pfid, lf))
119 mdd_object_put(env, parent);
120 parent = mdd_object_find(env, mdd, pfid);
122 /* cross-ref parent */
123 if (parent == NULL) {
126 GOTO(out, rc = EREMOTE);
127 } else if (IS_ERR(parent))
128 GOTO(out, rc = PTR_ERR(parent));
133 if (parent && !IS_ERR(parent))
134 mdd_object_put(env, parent);
139 * No permission check is needed.
141 * returns 1: if fid is ancestor of @mo;
142 * returns 0: if fid is not a ancestor of @mo;
144 * returns EREMOTE if remote object is found, fid of remote object is saved to
147 * returns < 0: if error
149 static int mdd_is_subdir(const struct lu_env *env,
150 struct md_object *mo, const struct lu_fid *fid,
153 struct mdd_device *mdd = mdo2mdd(mo);
157 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
160 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
165 /*Check whether it may create the cobj under the pobj*/
166 static int mdd_may_create(const struct lu_env *env,
167 struct mdd_object *pobj, struct mdd_object *cobj,
173 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
176 if (mdd_is_dead_obj(pobj))
179 /*check pobj may create or not*/
181 rc = mdd_permission_internal(env, pobj,
182 MAY_WRITE | MAY_EXEC);
188 * It's inline, so penalty for filesystems that don't use sticky bit is
191 static inline int mdd_is_sticky(const struct lu_env *env,
192 struct mdd_object *pobj,
193 struct mdd_object *cobj)
195 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
196 struct md_ucred *uc = md_ucred(env);
199 rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
202 } else if (tmp_la->la_uid == uc->mu_fsuid) {
205 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
208 else if (!(tmp_la->la_mode & S_ISVTX))
210 else if (tmp_la->la_uid == uc->mu_fsuid)
213 return !mdd_capable(uc, CAP_FOWNER);
217 /* Check whether it may delete the cobj under the pobj. */
218 static int mdd_may_delete(const struct lu_env *env,
219 struct mdd_object *pobj,
220 struct mdd_object *cobj,
221 int is_dir, int need_check)
223 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
229 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
232 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
236 if (!S_ISDIR(mdd_object_type(cobj)))
239 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
242 } else if (S_ISDIR(mdd_object_type(cobj))) {
247 if (mdd_is_dead_obj(pobj))
250 if (mdd_is_sticky(env, pobj, cobj))
254 rc = mdd_permission_internal(env, pobj,
255 MAY_WRITE | MAY_EXEC);
260 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
261 struct mdd_object *src_obj)
267 rc = mdd_may_create(env, tgt_obj, NULL, 1);
272 if (S_ISDIR(mdd_object_type(src_obj)))
275 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
281 static void mdd_lock2(const struct lu_env *env,
282 struct mdd_object *o0, struct mdd_object *o1)
284 mdd_write_lock(env, o0);
285 mdd_write_lock(env, o1);
288 static void mdd_unlock2(const struct lu_env *env,
289 struct mdd_object *o0, struct mdd_object *o1)
291 mdd_write_unlock(env, o1);
292 mdd_write_unlock(env, o0);
295 /* insert new index, add reference if isdir, update times */
296 static int __mdd_index_insert(const struct lu_env *env,
297 struct mdd_object *pobj, const struct lu_fid *lf,
298 const char *name, int isdir, struct thandle *th,
299 struct lustre_capa *capa)
301 struct dt_object *next = mdd_object_child(pobj);
302 struct timeval start;
306 mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
307 LPROC_MDD_INDEX_INSERT);
309 struct lu_attr *la = &mdd_env_info(env)->mti_la;
312 if (dt_try_as_dir(env, next))
313 rc = next->do_index_ops->dio_insert(env, next,
315 (struct dt_key *)name,
322 mdd_ref_add_internal(env, pobj, th);
324 la->la_valid = LA_MTIME|LA_CTIME;
325 la->la_atime = ma->ma_attr.la_atime;
326 la->la_ctime = ma->ma_attr.la_ctime;
327 rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
330 mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
331 LPROC_MDD_INDEX_INSERT);
335 static int __mdd_index_delete(const struct lu_env *env,
336 struct mdd_object *pobj, const char *name,
337 int is_dir, struct thandle *handle,
338 struct lustre_capa *capa)
340 struct dt_object *next = mdd_object_child(pobj);
341 struct timeval start;
345 mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
346 LPROC_MDD_INDEX_DELETE);
347 if (dt_try_as_dir(env, next)) {
348 rc = next->do_index_ops->dio_delete(env, next,
349 (struct dt_key *)name,
351 if (rc == 0 && is_dir)
352 mdd_ref_del_internal(env, pobj, handle);
355 mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
356 LPROC_MDD_INDEX_DELETE);
360 static int __mdd_index_insert_only(const struct lu_env *env,
361 struct mdd_object *pobj,
362 const struct lu_fid *lf,
363 const char *name, struct thandle *th,
364 struct lustre_capa *capa)
367 struct dt_object *next = mdd_object_child(pobj);
370 if (dt_try_as_dir(env, next))
371 rc = next->do_index_ops->dio_insert(env, next,
373 (struct dt_key *)name, th, capa);
379 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
380 struct md_object *src_obj, const char *name,
383 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
384 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
385 struct mdd_device *mdd = mdo2mdd(src_obj);
386 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
387 struct thandle *handle;
391 mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
392 handle = mdd_trans_start(env, mdd);
394 RETURN(PTR_ERR(handle));
396 mdd_lock2(env, mdd_tobj, mdd_sobj);
398 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
402 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
404 mdd_object_capa(env, mdd_tobj));
406 mdd_ref_add_internal(env, mdd_sobj, handle);
408 *la_copy = ma->ma_attr;
409 la_copy->la_valid = LA_CTIME;
410 rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
414 la_copy->la_valid = LA_CTIME | LA_MTIME;
415 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
418 mdd_unlock2(env, mdd_tobj, mdd_sobj);
419 mdd_trans_stop(env, mdd, rc, handle);
423 /* caller should take a lock before calling */
424 int mdd_finish_unlink(const struct lu_env *env,
425 struct mdd_object *obj, struct md_attr *ma,
431 rc = mdd_iattr_get(env, obj, ma);
432 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
433 /* add new orphan and the object
434 * will be deleted during the object_put() */
435 if (__mdd_orphan_add(env, obj, th) == 0)
436 set_bit(LU_OBJECT_ORPHAN,
437 &mdd2lu_obj(obj)->lo_header->loh_flags);
439 if (obj->mod_count == 0)
440 rc = mdd_object_kill(env, obj, ma);
446 * Check that @dir contains no entries except (possibly) dot and dotdot.
451 * -ENOTEMPTY not empty
455 static int mdd_dir_is_empty(const struct lu_env *env,
456 struct mdd_object *dir)
459 struct dt_object *obj;
460 struct dt_it_ops *iops;
464 obj = mdd_object_child(dir);
465 iops = &obj->do_index_ops->dio_it;
466 it = iops->init(env, obj, 0);
468 result = iops->get(env, it, (const void *)"");
471 for (result = 0, i = 0; result == 0 && i < 3; ++i)
472 result = iops->next(env, it);
475 else if (result == +1)
477 } else if (result == 0)
479 * Huh? Index contains no zero key?
490 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
491 struct mdd_object *cobj, struct md_attr *ma)
493 struct dt_object *dt_cobj = mdd_object_child(cobj);
497 rc = mdd_may_delete(env, pobj, cobj,
498 S_ISDIR(ma->ma_attr.la_mode), 1);
502 if (S_ISDIR(mdd_object_type(cobj))) {
503 if (dt_try_as_dir(env, dt_cobj))
504 rc = mdd_dir_is_empty(env, cobj);
512 static int mdd_unlink(const struct lu_env *env,
513 struct md_object *pobj, struct md_object *cobj,
514 const char *name, struct md_attr *ma)
516 struct mdd_device *mdd = mdo2mdd(pobj);
517 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
518 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
519 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
520 struct thandle *handle;
524 rc = mdd_log_txn_param_build(env, mdd_cobj, ma, MDD_TXN_UNLINK_OP);
528 handle = mdd_trans_start(env, mdd);
530 RETURN(PTR_ERR(handle));
532 mdd_lock2(env, mdd_pobj, mdd_cobj);
534 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
538 is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
539 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
540 mdd_object_capa(env, mdd_pobj));
544 mdd_ref_del_internal(env, mdd_cobj, handle);
545 *la_copy = ma->ma_attr;
548 mdd_ref_del_internal(env, mdd_cobj, handle);
550 la_copy->la_valid = LA_CTIME;
551 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
556 la_copy->la_valid = LA_CTIME | LA_MTIME;
557 rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
561 rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
564 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
565 strlen("unlinked"), "unlinked", 0,
568 mdd_unlock2(env, mdd_pobj, mdd_cobj);
569 mdd_trans_stop(env, mdd, rc, handle);
574 * Partial operation. Be aware, this is called with write lock taken, so we use
575 * locksless version of __mdd_lookup() here.
577 static int mdd_ni_sanity_check(const struct lu_env *env,
578 struct md_object *pobj,
580 const struct lu_fid *fid)
582 struct mdd_object *obj = md2mdd_obj(pobj);
589 if (mdd_is_dead_obj(obj))
592 /* The exist of the name will be checked in _index_insert. */
594 rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
596 RETURN(rc ? : -EEXIST);
600 RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
603 static int mdd_name_insert(const struct lu_env *env,
604 struct md_object *pobj,
605 const char *name, const struct lu_fid *fid,
608 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
609 struct mdd_device *mdd = mdo2mdd(pobj);
610 struct thandle *handle;
614 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
615 handle = mdd_trans_start(env, mdo2mdd(pobj));
617 RETURN(PTR_ERR(handle));
619 mdd_write_lock(env, mdd_obj);
620 rc = mdd_ni_sanity_check(env, pobj, name, fid);
622 GOTO(out_unlock, rc);
624 rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
628 mdd_write_unlock(env, mdd_obj);
630 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
635 * Be aware, this is called with write lock taken, so we use locksless version
636 * of __mdd_lookup() here.
638 static int mdd_nr_sanity_check(const struct lu_env *env,
639 struct md_object *pobj,
642 struct mdd_object *obj = md2mdd_obj(pobj);
644 struct mdd_thread_info *info = mdd_env_info(env);
645 struct lu_fid *fid = &info->mti_fid;
651 if (mdd_is_dead_obj(obj))
654 /* The exist of the name will be checked in _index_delete. */
656 rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
659 RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
662 static int mdd_name_remove(const struct lu_env *env,
663 struct md_object *pobj,
664 const char *name, int is_dir)
666 struct mdd_device *mdd = mdo2mdd(pobj);
667 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
668 struct thandle *handle;
672 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
673 handle = mdd_trans_start(env, mdd);
675 RETURN(PTR_ERR(handle));
677 mdd_write_lock(env, mdd_obj);
678 rc = mdd_nr_sanity_check(env, pobj, name);
680 GOTO(out_unlock, rc);
682 rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
686 mdd_write_unlock(env, mdd_obj);
688 mdd_trans_stop(env, mdd, rc, handle);
691 static int mdd_rt_sanity_check(const struct lu_env *env,
692 struct mdd_object *tgt_pobj,
693 struct mdd_object *tobj,
694 const struct lu_fid *sfid,
695 const char *name, struct md_attr *ma)
701 if (mdd_is_dead_obj(tgt_pobj))
704 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
706 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
707 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
708 mdd_dir_is_empty(env, tobj))
711 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
717 static int mdd_rename_tgt(const struct lu_env *env,
718 struct md_object *pobj, struct md_object *tobj,
719 const struct lu_fid *lf, const char *name,
722 struct mdd_device *mdd = mdo2mdd(pobj);
723 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
724 struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
725 struct thandle *handle;
729 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
730 handle = mdd_trans_start(env, mdd);
732 RETURN(PTR_ERR(handle));
735 mdd_lock2(env, mdd_tpobj, mdd_tobj);
737 mdd_write_lock(env, mdd_tpobj);
739 /*TODO rename sanity checking*/
740 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
744 /* if rename_tgt is called then we should just re-insert name with
745 * correct fid, no need to dec/inc parent nlink if obj is dir */
746 rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
750 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
755 if (tobj && lu_object_exists(&tobj->mo_lu))
756 mdd_ref_del_internal(env, mdd_tobj, handle);
759 mdd_unlock2(env, mdd_tpobj, mdd_tobj);
761 mdd_write_unlock(env, mdd_tpobj);
762 mdd_trans_stop(env, mdd, rc, handle);
767 * The permission has been checked when obj created,
768 * no need check again.
770 static int mdd_cd_sanity_check(const struct lu_env *env,
771 struct mdd_object *obj)
777 if (!obj || mdd_is_dead_obj(obj))
781 mdd_read_lock(env, obj);
782 rc = mdd_permission_internal(env, obj, MAY_WRITE);
783 mdd_read_unlock(env, obj);
790 static int mdd_create_data(const struct lu_env *env,
791 struct md_object *pobj, struct md_object *cobj,
792 const struct md_create_spec *spec,
795 struct mdd_device *mdd = mdo2mdd(cobj);
796 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
797 struct mdd_object *son = md2mdd_obj(cobj);
798 struct lu_attr *attr = &ma->ma_attr;
799 struct lov_mds_md *lmm = NULL;
801 struct thandle *handle;
805 rc = mdd_cd_sanity_check(env, son);
809 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
810 !(spec->sp_cr_flags & FMODE_WRITE))
812 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
817 mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
818 handle = mdd_trans_start(env, mdd);
820 RETURN(rc = PTR_ERR(handle));
823 * XXX: Setting the lov ea is not locked but setting the attr is locked?
826 /* Replay creates has objects already */
827 if (spec->u.sp_ea.no_lov_create) {
828 CDEBUG(D_INFO, "we already have lov ea\n");
829 rc = mdd_lov_set_md(env, mdd_pobj, son,
830 (struct lov_mds_md *)spec->u.sp_ea.eadata,
831 spec->u.sp_ea.eadatalen, handle, 0);
833 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
834 lmm_size, handle, 0);
837 rc = mdd_attr_get_internal_locked(env, son, ma);
839 /* Finish mdd_lov_create() stuff. */
840 mdd_lov_create_finish(env, mdd, rc);
841 mdd_trans_stop(env, mdd, rc, handle);
843 OBD_FREE(lmm, lmm_size);
848 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
849 const char *name, const struct lu_fid* fid, int mask)
851 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
852 struct dt_object *dir = mdd_object_child(mdd_obj);
853 struct dt_rec *rec = (struct dt_rec *)fid;
854 const struct dt_key *key = (const struct dt_key *)name;
855 struct timeval start;
859 mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
860 if (mdd_is_dead_obj(mdd_obj))
863 rc = lu_object_exists(mdd2lu_obj(mdd_obj));
867 CERROR("Object "DFID" locates on remote server\n",
868 PFID(mdo2fid(mdd_obj)));
873 if (mask == MAY_EXEC)
874 rc = mdd_exec_permission_lite(env, mdd_obj);
877 rc = mdd_permission_internal(env, mdd_obj, mask);
881 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
882 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
883 mdd_object_capa(env, mdd_obj));
887 mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
891 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
892 struct mdd_object *child, struct md_attr *ma,
893 struct thandle *handle)
898 /* update attributes for child.
900 * (1) the valid bits should be converted between Lustre and Linux;
901 * (2) maybe, the child attributes should be set in OSD when creation.
904 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
908 if (S_ISDIR(ma->ma_attr.la_mode)) {
909 /* add . and .. for newly created dir */
910 mdd_ref_add_internal(env, child, handle);
911 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
912 dot, handle, BYPASS_CAPA);
914 rc = __mdd_index_insert_only(env, child, pfid,
920 rc2 = __mdd_index_delete(env, child, dot, 0,
921 handle, BYPASS_CAPA);
923 CERROR("Failure to cleanup after dotdot"
924 " creation: %d (%d)\n", rc2, rc);
926 mdd_ref_del_internal(env, child, handle);
933 static int mdd_create_sanity_check(const struct lu_env *env,
934 struct md_object *pobj,
935 const char *name, struct md_attr *ma)
937 struct mdd_thread_info *info = mdd_env_info(env);
938 struct lu_attr *la = &info->mti_la;
939 struct lu_fid *fid = &info->mti_fid;
940 struct mdd_object *obj = md2mdd_obj(pobj);
945 if (mdd_is_dead_obj(obj))
949 * Check if the name already exist, though it will be checked
950 * in _index_insert also, for avoiding rolling back if exists
953 rc = __mdd_lookup_locked(env, pobj, name, fid,
954 MAY_WRITE | MAY_EXEC);
956 RETURN(rc ? : -EEXIST);
959 mdd_read_lock(env, obj);
960 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
961 mdd_read_unlock(env, obj);
965 if (la->la_mode & S_ISGID) {
966 ma->ma_attr.la_gid = la->la_gid;
967 if (S_ISDIR(ma->ma_attr.la_mode)) {
968 ma->ma_attr.la_mode |= S_ISGID;
969 ma->ma_attr.la_valid |= LA_MODE;
973 switch (ma->ma_attr.la_mode & S_IFMT) {
991 * Create object and insert it into namespace.
993 static int mdd_create(const struct lu_env *env,
994 struct md_object *pobj, const char *name,
995 struct md_object *child,
996 struct md_create_spec *spec,
999 struct mdd_device *mdd = mdo2mdd(pobj);
1000 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1001 struct mdd_object *son = md2mdd_obj(child);
1002 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1003 struct lu_attr *attr = &ma->ma_attr;
1004 struct lov_mds_md *lmm = NULL;
1005 struct thandle *handle;
1006 int rc, created = 0, inserted = 0, lmm_size = 0;
1007 struct timeval start;
1010 mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
1012 * Two operations have to be performed:
1014 * - allocation of new object (->do_create()), and
1016 * - insertion into parent index (->dio_insert()).
1018 * Due to locking, operation order is not important, when both are
1019 * successful, *but* error handling cases are quite different:
1021 * - if insertion is done first, and following object creation fails,
1022 * insertion has to be rolled back, but this operation might fail
1023 * also leaving us with dangling index entry.
1025 * - if creation is done first, is has to be undone if insertion
1026 * fails, leaving us with leaked space, which is neither good, nor
1029 * It seems that creation-first is simplest solution, but it is
1030 * sub-optimal in the frequent
1035 * case, because second mkdir is bound to create object, only to
1036 * destroy it immediately.
1038 * To avoid this follow local file systems that do double lookup:
1040 * 0. lookup -> -EEXIST (mdd_create_sanity_check())
1042 * 1. create (mdd_object_create_internal())
1044 * 2. insert (__mdd_index_insert(), lookup again)
1047 /* sanity checks before big job */
1048 rc = mdd_create_sanity_check(env, pobj, name, ma);
1052 /* no RPC inside the transaction, so OST objects should be created at
1054 if (S_ISREG(attr->la_mode)) {
1055 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1061 mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1062 handle = mdd_trans_start(env, mdd);
1064 RETURN(PTR_ERR(handle));
1066 mdd_write_lock(env, mdd_pobj);
1069 * XXX check that link can be added to the parent in mkdir case.
1072 mdd_write_lock(env, son);
1073 rc = mdd_object_create_internal(env, son, ma, handle);
1075 mdd_write_unlock(env, son);
1081 #ifdef CONFIG_FS_POSIX_ACL
1082 rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1084 mdd_write_unlock(env, son);
1087 ma->ma_attr.la_valid |= LA_MODE;
1091 rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1093 mdd_write_unlock(env, son);
1096 * Object has no links, so it will be destroyed when last
1097 * reference is released. (XXX not now.)
1101 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1102 name, S_ISDIR(attr->la_mode), handle,
1103 mdd_object_capa(env, mdd_pobj));
1109 /* replay creates has objects already */
1110 if (spec->u.sp_ea.no_lov_create) {
1111 CDEBUG(D_INFO, "we already have lov ea\n");
1112 LASSERT(lmm != NULL);
1113 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1114 lmm_size = spec->u.sp_ea.eadatalen;
1116 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1118 CERROR("error on stripe info copy %d \n", rc);
1121 if (lmm && lmm_size > 0) {
1122 /* set Lov here, do not get lmm again later */
1123 memcpy(ma->ma_lmm, lmm, lmm_size);
1124 ma->ma_lmm_size = lmm_size;
1125 ma->ma_valid |= MA_LOV;
1128 if (S_ISLNK(attr->la_mode)) {
1129 struct dt_object *dt = mdd_object_child(son);
1130 const char *target_name = spec->u.sp_symname;
1131 int sym_len = strlen(target_name);
1132 const struct lu_buf *buf;
1135 buf = mdd_buf_get_const(env, target_name, sym_len);
1136 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1137 mdd_object_capa(env, son));
1144 *la_copy = ma->ma_attr;
1145 la_copy->la_valid = LA_CTIME | LA_MTIME;
1146 rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
1150 /* return attr back */
1151 rc = mdd_attr_get_internal_locked(env, son, ma);
1153 if (rc && created) {
1157 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1158 S_ISDIR(attr->la_mode),
1159 handle, BYPASS_CAPA);
1161 CERROR("error can not cleanup destroy %d\n",
1165 mdd_write_lock(env, son);
1166 mdd_ref_del_internal(env, son, handle);
1167 mdd_write_unlock(env, son);
1170 /* finish mdd_lov_create() stuff */
1171 mdd_lov_create_finish(env, mdd, rc);
1173 OBD_FREE(lmm, lmm_size);
1174 mdd_write_unlock(env, mdd_pobj);
1175 mdd_trans_stop(env, mdd, rc, handle);
1176 mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
1180 static int mdd_rename_lock(const struct lu_env *env,
1181 struct mdd_device *mdd,
1182 struct mdd_object *src_pobj,
1183 struct mdd_object *tgt_pobj)
1188 if (src_pobj == tgt_pobj) {
1189 mdd_write_lock(env, src_pobj);
1193 /* compared the parent child relationship of src_p&tgt_p */
1194 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1195 mdd_lock2(env, src_pobj, tgt_pobj);
1197 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1198 mdd_lock2(env, tgt_pobj, src_pobj);
1202 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1207 mdd_lock2(env, tgt_pobj, src_pobj);
1211 mdd_lock2(env, src_pobj, tgt_pobj);
1215 static void mdd_rename_unlock(const struct lu_env *env,
1216 struct mdd_object *src_pobj,
1217 struct mdd_object *tgt_pobj)
1219 mdd_write_unlock(env, src_pobj);
1220 if (src_pobj != tgt_pobj)
1221 mdd_write_unlock(env, tgt_pobj);
1224 static int mdd_rename_sanity_check(const struct lu_env *env,
1225 struct mdd_object *src_pobj,
1226 struct mdd_object *tgt_pobj,
1227 const struct lu_fid *sfid,
1229 struct mdd_object *tobj)
1234 if (mdd_is_dead_obj(src_pobj))
1237 /* The sobj maybe on the remote, check parent permission only here */
1238 rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC);
1243 rc = mdd_may_create(env, tgt_pobj, NULL,
1244 (src_pobj != tgt_pobj));
1246 mdd_read_lock(env, tobj);
1247 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1248 (src_pobj != tgt_pobj));
1250 if (S_ISDIR(mdd_object_type(tobj))
1251 && mdd_dir_is_empty(env, tobj))
1253 mdd_read_unlock(env, tobj);
1258 /* src object can be remote that is why we use only fid and type of object */
1259 static int mdd_rename(const struct lu_env *env,
1260 struct md_object *src_pobj, struct md_object *tgt_pobj,
1261 const struct lu_fid *lf, const char *sname,
1262 struct md_object *tobj, const char *tname,
1265 struct mdd_device *mdd = mdo2mdd(src_pobj);
1266 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1267 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1268 struct mdd_object *mdd_sobj = NULL;
1269 struct mdd_object *mdd_tobj = NULL;
1270 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1271 struct thandle *handle;
1276 LASSERT(ma->ma_attr.la_mode & S_IFMT);
1277 is_dir = S_ISDIR(ma->ma_attr.la_mode);
1278 if (ma->ma_attr.la_valid & LA_FLAGS &&
1279 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1283 mdd_tobj = md2mdd_obj(tobj);
1285 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1286 handle = mdd_trans_start(env, mdd);
1288 RETURN(PTR_ERR(handle));
1290 /* FIXME: Should consider tobj and sobj too in rename_lock. */
1291 rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
1293 GOTO(cleanup_unlocked, rc);
1295 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1296 lf, is_dir, mdd_tobj);
1300 rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1301 mdd_object_capa(env, mdd_spobj));
1306 * Here tobj can be remote one, so we do index_delete unconditionally
1307 * and -ENOENT is allowed.
1309 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1310 mdd_object_capa(env, mdd_tpobj));
1311 if (rc != 0 && rc != -ENOENT)
1314 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1315 mdd_object_capa(env, mdd_tpobj));
1319 mdd_sobj = mdd_object_find(env, mdd, lf);
1320 *la_copy = ma->ma_attr;
1321 la_copy->la_valid = LA_CTIME;
1323 /*XXX: how to update ctime for remote sobj? */
1324 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
1328 if (tobj && lu_object_exists(&tobj->mo_lu)) {
1329 mdd_write_lock(env, mdd_tobj);
1330 mdd_ref_del_internal(env, mdd_tobj, handle);
1331 /* remove dot reference */
1333 mdd_ref_del_internal(env, mdd_tobj, handle);
1335 la_copy->la_valid = LA_CTIME;
1336 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
1340 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1341 mdd_write_unlock(env, mdd_tobj);
1346 la_copy->la_valid = LA_CTIME | LA_MTIME;
1347 rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0);
1351 if (mdd_spobj != mdd_tpobj) {
1352 la_copy->la_valid = LA_CTIME | LA_MTIME;
1353 rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0);
1357 mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
1359 mdd_trans_stop(env, mdd, rc, handle);
1361 mdd_object_put(env, mdd_sobj);
1365 struct md_dir_operations mdd_dir_ops = {
1366 .mdo_is_subdir = mdd_is_subdir,
1367 .mdo_lookup = mdd_lookup,
1368 .mdo_create = mdd_create,
1369 .mdo_rename = mdd_rename,
1370 .mdo_link = mdd_link,
1371 .mdo_unlink = mdd_unlink,
1372 .mdo_name_insert = mdd_name_insert,
1373 .mdo_name_remove = mdd_name_remove,
1374 .mdo_rename_tgt = mdd_rename_tgt,
1375 .mdo_create_data = mdd_create_data