1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mdd) routines
7 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * Author: Wang Di <wangdi@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
29 # define EXPORT_SYMTAB
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/module.h>
34 #include <linux/jbd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
46 #include "mdd_internal.h"
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52 const char *name, struct lu_fid* fid, int mask);
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55 const char *name, struct lu_fid* fid, int mask)
57 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58 struct dynlock_handle *dlh;
61 dlh = mdd_pdo_read_lock(env, mdd_obj, name);
64 rc = __mdd_lookup(env, pobj, name, fid, mask);
65 mdd_pdo_read_unlock(env, mdd_obj, dlh);
70 static int mdd_lookup(const struct lu_env *env,
71 struct md_object *pobj, const char *name,
76 rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
84 return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
88 * For root fid use special function, whcih does not compare version component
89 * of fid. Vresion component is different for root fids on all MDTs.
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
93 return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
98 * return 1: if lf is the fid of the ancestor of p1;
101 * return -EREMOTE: if remote object is found, in this
102 * case fid of remote object is saved to @pf;
104 * otherwise: values < 0, errors.
106 static int mdd_is_parent(const struct lu_env *env,
107 struct mdd_device *mdd,
108 struct mdd_object *p1,
109 const struct lu_fid *lf,
112 struct mdd_object *parent = NULL;
117 LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118 pfid = &mdd_env_info(env)->mti_fid;
120 /* Check for root first. */
121 if (mdd_is_root(mdd, mdo2fid(p1)))
125 rc = mdd_parent_fid(env, p1, pfid);
128 if (mdd_is_root(mdd, pfid))
130 if (lu_fid_eq(pfid, lf))
133 mdd_object_put(env, parent);
134 parent = mdd_object_find(env, mdd, pfid);
136 /* cross-ref parent */
137 if (parent == NULL) {
140 GOTO(out, rc = -EREMOTE);
141 } else if (IS_ERR(parent))
142 GOTO(out, rc = PTR_ERR(parent));
147 if (parent && !IS_ERR(parent))
148 mdd_object_put(env, parent);
153 * No permission check is needed.
155 * returns 1: if fid is ancestor of @mo;
156 * returns 0: if fid is not a ancestor of @mo;
158 * returns EREMOTE if remote object is found, fid of remote object is saved to
161 * returns < 0: if error
163 static int mdd_is_subdir(const struct lu_env *env,
164 struct md_object *mo, const struct lu_fid *fid,
167 struct mdd_device *mdd = mdo2mdd(mo);
171 if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
174 rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
179 /*Check whether it may create the cobj under the pobj*/
180 static int mdd_may_create(const struct lu_env *env,
181 struct mdd_object *pobj, struct mdd_object *cobj,
187 if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
190 if (mdd_is_dead_obj(pobj))
193 /*check pobj may create or not*/
195 rc = mdd_permission_internal_locked(env, pobj,
196 MAY_WRITE | MAY_EXEC);
202 * It's inline, so penalty for filesystems that don't use sticky bit is
205 static inline int mdd_is_sticky(const struct lu_env *env,
206 struct mdd_object *pobj,
207 struct mdd_object *cobj)
209 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
210 struct md_ucred *uc = md_ucred(env);
213 rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
216 } else if (tmp_la->la_uid == uc->mu_fsuid) {
219 mdd_read_lock(env, pobj);
220 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
221 mdd_read_unlock(env, pobj);
224 else if (!(tmp_la->la_mode & S_ISVTX))
226 else if (tmp_la->la_uid == uc->mu_fsuid)
229 return !mdd_capable(uc, CAP_FOWNER);
233 /* Check whether it may delete the cobj under the pobj. */
234 static int mdd_may_delete(const struct lu_env *env,
235 struct mdd_object *pobj,
236 struct mdd_object *cobj,
237 int is_dir, int need_check)
239 struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
245 if (!lu_object_exists(&cobj->mod_obj.mo_lu))
248 if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
252 if (!S_ISDIR(mdd_object_type(cobj)))
255 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
258 } else if (S_ISDIR(mdd_object_type(cobj))) {
263 if (mdd_is_dead_obj(pobj))
266 if (mdd_is_sticky(env, pobj, cobj))
270 rc = mdd_permission_internal_locked(env, pobj,
277 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
278 struct mdd_object *src_obj)
284 rc = mdd_may_create(env, tgt_obj, NULL, 1);
289 if (S_ISDIR(mdd_object_type(src_obj)))
292 if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
298 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
299 const struct lu_fid *fid)
301 struct mdd_thread_info *info = mdd_env_info(env);
303 fid_cpu_to_be(&info->mti_fid2, fid);
304 return (const struct dt_rec *)&info->mti_fid2;
308 /* insert new index, add reference if isdir, update times */
309 static int __mdd_index_insert(const struct lu_env *env,
310 struct mdd_object *pobj, const struct lu_fid *lf,
311 const char *name, int isdir, struct thandle *th,
312 struct lustre_capa *capa)
314 struct dt_object *next = mdd_object_child(pobj);
315 struct timeval start;
319 mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
320 LPROC_MDD_INDEX_INSERT);
322 struct lu_attr *la = &mdd_env_info(env)->mti_la;
325 if (dt_try_as_dir(env, next))
326 rc = next->do_index_ops->dio_insert(env, next,
327 __mdd_fid_rec(env, lf),
328 (const struct dt_key *)name,
335 mdd_write_lock(env, pobj);
336 mdd_ref_add_internal(env, pobj, th);
337 mdd_write_unlock(env, pobj);
340 la->la_valid = LA_MTIME|LA_CTIME;
341 la->la_atime = ma->ma_attr.la_atime;
342 la->la_ctime = ma->ma_attr.la_ctime;
343 rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
346 mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
347 LPROC_MDD_INDEX_INSERT);
351 static int __mdd_index_delete(const struct lu_env *env,
352 struct mdd_object *pobj, const char *name,
353 int is_dir, struct thandle *handle,
354 struct lustre_capa *capa)
356 struct dt_object *next = mdd_object_child(pobj);
357 struct timeval start;
361 mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
362 LPROC_MDD_INDEX_DELETE);
363 if (dt_try_as_dir(env, next)) {
364 rc = next->do_index_ops->dio_delete(env, next,
365 (struct dt_key *)name,
367 if (rc == 0 && is_dir) {
368 mdd_write_lock(env, pobj);
369 mdd_ref_del_internal(env, pobj, handle);
370 mdd_write_unlock(env, pobj);
374 mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
375 LPROC_MDD_INDEX_DELETE);
379 static int __mdd_index_insert_only(const struct lu_env *env,
380 struct mdd_object *pobj,
381 const struct lu_fid *lf,
382 const char *name, struct thandle *th,
383 struct lustre_capa *capa)
386 struct dt_object *next = mdd_object_child(pobj);
389 if (dt_try_as_dir(env, next))
390 rc = next->do_index_ops->dio_insert(env, next,
391 __mdd_fid_rec(env, lf),
392 (const struct dt_key *)name, th, capa);
398 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
399 struct md_object *src_obj, const char *name,
402 struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
403 struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
404 struct mdd_device *mdd = mdo2mdd(src_obj);
405 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
406 struct thandle *handle;
407 struct dynlock_handle *dlh;
411 mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
412 handle = mdd_trans_start(env, mdd);
414 RETURN(PTR_ERR(handle));
416 dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
418 GOTO(out_trans, rc = -ENOMEM);
419 mdd_write_lock(env, mdd_sobj);
421 rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
425 rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
427 mdd_object_capa(env, mdd_tobj));
429 mdd_ref_add_internal(env, mdd_sobj, handle);
431 *la_copy = ma->ma_attr;
432 la_copy->la_valid = LA_CTIME;
433 rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
437 la_copy->la_valid = LA_CTIME | LA_MTIME;
438 rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0);
441 mdd_write_unlock(env, mdd_sobj);
442 mdd_pdo_write_unlock(env, mdd_tobj, dlh);
444 mdd_trans_stop(env, mdd, rc, handle);
448 /* caller should take a lock before calling */
449 int mdd_finish_unlink(const struct lu_env *env,
450 struct mdd_object *obj, struct md_attr *ma,
456 rc = mdd_iattr_get(env, obj, ma);
457 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
458 /* add new orphan and the object
459 * will be deleted during the object_put() */
460 if (__mdd_orphan_add(env, obj, th) == 0)
461 set_bit(LU_OBJECT_ORPHAN,
462 &mdd2lu_obj(obj)->lo_header->loh_flags);
464 if (obj->mod_count == 0)
465 rc = mdd_object_kill(env, obj, ma);
467 /* clear MA_LOV | MA_COOKIE, if we do not
468 * unlink it in case we get it somewhere */
469 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
471 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
477 * Check that @dir contains no entries except (possibly) dot and dotdot.
482 * -ENOTEMPTY not empty
486 static int mdd_dir_is_empty(const struct lu_env *env,
487 struct mdd_object *dir)
490 struct dt_object *obj;
491 struct dt_it_ops *iops;
495 obj = mdd_object_child(dir);
496 iops = &obj->do_index_ops->dio_it;
497 it = iops->init(env, obj, 0, BYPASS_CAPA);
499 result = iops->get(env, it, (const void *)"");
502 for (result = 0, i = 0; result == 0 && i < 3; ++i)
503 result = iops->next(env, it);
506 else if (result == +1)
508 } else if (result == 0)
510 * Huh? Index contains no zero key?
521 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
522 struct mdd_object *cobj, struct md_attr *ma)
524 struct dt_object *dt_cobj = mdd_object_child(cobj);
528 rc = mdd_may_delete(env, pobj, cobj,
529 S_ISDIR(ma->ma_attr.la_mode), 1);
533 if (S_ISDIR(mdd_object_type(cobj))) {
534 if (dt_try_as_dir(env, dt_cobj))
535 rc = mdd_dir_is_empty(env, cobj);
543 static int mdd_unlink(const struct lu_env *env,
544 struct md_object *pobj, struct md_object *cobj,
545 const char *name, struct md_attr *ma)
547 struct mdd_device *mdd = mdo2mdd(pobj);
548 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
549 struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
550 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
551 struct thandle *handle;
552 struct dynlock_handle *dlh;
556 rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
560 handle = mdd_trans_start(env, mdd);
562 RETURN(PTR_ERR(handle));
564 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
566 GOTO(out_trans, rc = -ENOMEM);
567 mdd_write_lock(env, mdd_cobj);
569 rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
573 is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
574 rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
575 mdd_object_capa(env, mdd_pobj));
579 mdd_ref_del_internal(env, mdd_cobj, handle);
580 *la_copy = ma->ma_attr;
583 mdd_ref_del_internal(env, mdd_cobj, handle);
585 la_copy->la_valid = LA_CTIME;
586 rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
591 la_copy->la_valid = LA_CTIME | LA_MTIME;
592 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
596 rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
599 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
600 strlen("unlinked"), "unlinked", 0,
603 mdd_write_unlock(env, mdd_cobj);
604 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
606 mdd_trans_stop(env, mdd, rc, handle);
611 * Partial operation. Be aware, this is called with write lock taken, so we use
612 * locksless version of __mdd_lookup() here.
614 static int mdd_ni_sanity_check(const struct lu_env *env,
615 struct md_object *pobj,
617 const struct lu_fid *fid)
619 struct mdd_object *obj = md2mdd_obj(pobj);
626 if (mdd_is_dead_obj(obj))
629 /* The exist of the name will be checked in _index_insert. */
631 rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
633 RETURN(rc ? : -EEXIST);
637 RETURN(mdd_permission_internal_locked(env, obj,
638 MAY_WRITE | MAY_EXEC));
641 static int mdd_name_insert(const struct lu_env *env,
642 struct md_object *pobj,
643 const char *name, const struct lu_fid *fid,
646 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
647 struct mdd_device *mdd = mdo2mdd(pobj);
648 struct thandle *handle;
649 struct dynlock_handle *dlh;
653 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
654 handle = mdd_trans_start(env, mdo2mdd(pobj));
656 RETURN(PTR_ERR(handle));
658 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
660 GOTO(out_trans, rc = -ENOMEM);
661 rc = mdd_ni_sanity_check(env, pobj, name, fid);
663 GOTO(out_unlock, rc);
665 rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
669 mdd_pdo_write_unlock(env, mdd_obj, dlh);
671 mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
676 * Be aware, this is called with write lock taken, so we use locksless version
677 * of __mdd_lookup() here.
679 static int mdd_nr_sanity_check(const struct lu_env *env,
680 struct md_object *pobj,
683 struct mdd_object *obj = md2mdd_obj(pobj);
685 struct mdd_thread_info *info = mdd_env_info(env);
686 struct lu_fid *fid = &info->mti_fid;
692 if (mdd_is_dead_obj(obj))
695 /* The exist of the name will be checked in _index_delete. */
697 rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
700 RETURN(mdd_permission_internal_locked(env, obj,
701 MAY_WRITE | MAY_EXEC));
704 static int mdd_name_remove(const struct lu_env *env,
705 struct md_object *pobj,
706 const char *name, int is_dir)
708 struct mdd_device *mdd = mdo2mdd(pobj);
709 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
710 struct thandle *handle;
711 struct dynlock_handle *dlh;
715 mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
716 handle = mdd_trans_start(env, mdd);
718 RETURN(PTR_ERR(handle));
720 dlh = mdd_pdo_write_lock(env, mdd_obj, name);
722 GOTO(out_trans, rc = -ENOMEM);
723 rc = mdd_nr_sanity_check(env, pobj, name);
725 GOTO(out_unlock, rc);
727 rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
731 mdd_pdo_write_unlock(env, mdd_obj, dlh);
733 mdd_trans_stop(env, mdd, rc, handle);
736 static int mdd_rt_sanity_check(const struct lu_env *env,
737 struct mdd_object *tgt_pobj,
738 struct mdd_object *tobj,
739 const struct lu_fid *sfid,
740 const char *name, struct md_attr *ma)
746 if (mdd_is_dead_obj(tgt_pobj))
749 src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
751 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
752 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
753 mdd_dir_is_empty(env, tobj))
756 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
762 static int mdd_rename_tgt(const struct lu_env *env,
763 struct md_object *pobj, struct md_object *tobj,
764 const struct lu_fid *lf, const char *name,
767 struct mdd_device *mdd = mdo2mdd(pobj);
768 struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
769 struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
770 struct thandle *handle;
771 struct dynlock_handle *dlh;
775 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
776 handle = mdd_trans_start(env, mdd);
778 RETURN(PTR_ERR(handle));
780 dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
782 GOTO(out_trans, rc = -ENOMEM);
784 mdd_write_lock(env, mdd_tobj);
786 /*TODO rename sanity checking*/
787 rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
791 /* if rename_tgt is called then we should just re-insert name with
792 * correct fid, no need to dec/inc parent nlink if obj is dir */
793 rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
797 rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
802 if (tobj && lu_object_exists(&tobj->mo_lu))
803 mdd_ref_del_internal(env, mdd_tobj, handle);
806 mdd_write_unlock(env, mdd_tobj);
807 mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
809 mdd_trans_stop(env, mdd, rc, handle);
814 * The permission has been checked when obj created,
815 * no need check again.
817 static int mdd_cd_sanity_check(const struct lu_env *env,
818 struct mdd_object *obj)
824 if (!obj || mdd_is_dead_obj(obj))
828 mdd_read_lock(env, obj);
829 rc = mdd_permission_internal(env, obj, MAY_WRITE);
830 mdd_read_unlock(env, obj);
837 static int mdd_create_data(const struct lu_env *env,
838 struct md_object *pobj, struct md_object *cobj,
839 const struct md_create_spec *spec,
842 struct mdd_device *mdd = mdo2mdd(cobj);
843 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
844 struct mdd_object *son = md2mdd_obj(cobj);
845 struct lu_attr *attr = &ma->ma_attr;
846 struct lov_mds_md *lmm = NULL;
848 struct thandle *handle;
852 rc = mdd_cd_sanity_check(env, son);
856 if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
857 !(spec->sp_cr_flags & FMODE_WRITE))
859 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
864 mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
865 handle = mdd_trans_start(env, mdd);
867 RETURN(rc = PTR_ERR(handle));
870 * XXX: Setting the lov ea is not locked but setting the attr is locked?
873 /* Replay creates has objects already */
874 if (spec->u.sp_ea.no_lov_create) {
875 CDEBUG(D_INFO, "we already have lov ea\n");
876 rc = mdd_lov_set_md(env, mdd_pobj, son,
877 (struct lov_mds_md *)spec->u.sp_ea.eadata,
878 spec->u.sp_ea.eadatalen, handle, 0);
880 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
881 lmm_size, handle, 0);
884 rc = mdd_attr_get_internal_locked(env, son, ma);
886 /* Finish mdd_lov_create() stuff. */
887 mdd_lov_create_finish(env, mdd, rc);
888 mdd_trans_stop(env, mdd, rc, handle);
890 OBD_FREE(lmm, lmm_size);
895 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
896 const char *name, struct lu_fid* fid, int mask)
898 struct mdd_object *mdd_obj = md2mdd_obj(pobj);
899 struct dt_object *dir = mdd_object_child(mdd_obj);
900 struct dt_rec *rec = (struct dt_rec *)fid;
901 const struct dt_key *key = (const struct dt_key *)name;
902 struct timeval start;
906 mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
907 if (mdd_is_dead_obj(mdd_obj))
910 rc = lu_object_exists(mdd2lu_obj(mdd_obj));
914 CERROR("Object "DFID" locates on remote server\n",
915 PFID(mdo2fid(mdd_obj)));
920 if (mask == MAY_EXEC)
921 rc = mdd_exec_permission_lite(env, mdd_obj);
924 rc = mdd_permission_internal_locked(env, mdd_obj, mask);
928 if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
929 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
930 mdd_object_capa(env, mdd_obj));
932 fid_be_to_cpu(fid, fid);
936 mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
940 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
941 struct mdd_object *child, struct md_attr *ma,
942 struct thandle *handle)
947 /* update attributes for child.
949 * (1) the valid bits should be converted between Lustre and Linux;
950 * (2) maybe, the child attributes should be set in OSD when creation.
953 rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
957 if (S_ISDIR(ma->ma_attr.la_mode)) {
958 /* add . and .. for newly created dir */
959 mdd_ref_add_internal(env, child, handle);
960 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
961 dot, handle, BYPASS_CAPA);
963 rc = __mdd_index_insert_only(env, child, pfid,
969 rc2 = __mdd_index_delete(env, child, dot, 0,
970 handle, BYPASS_CAPA);
972 CERROR("Failure to cleanup after dotdot"
973 " creation: %d (%d)\n", rc2, rc);
975 mdd_ref_del_internal(env, child, handle);
982 static int mdd_create_sanity_check(const struct lu_env *env,
983 struct md_object *pobj,
984 const char *name, struct md_attr *ma)
986 struct mdd_thread_info *info = mdd_env_info(env);
987 struct lu_attr *la = &info->mti_la;
988 struct lu_fid *fid = &info->mti_fid;
989 struct mdd_object *obj = md2mdd_obj(pobj);
994 if (mdd_is_dead_obj(obj))
998 * Check if the name already exist, though it will be checked
999 * in _index_insert also, for avoiding rolling back if exists
1002 rc = __mdd_lookup_locked(env, pobj, name, fid,
1003 MAY_WRITE | MAY_EXEC);
1005 RETURN(rc ? : -EEXIST);
1008 mdd_read_lock(env, obj);
1009 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1010 mdd_read_unlock(env, obj);
1014 if (la->la_mode & S_ISGID) {
1015 ma->ma_attr.la_gid = la->la_gid;
1016 if (S_ISDIR(ma->ma_attr.la_mode)) {
1017 ma->ma_attr.la_mode |= S_ISGID;
1018 ma->ma_attr.la_valid |= LA_MODE;
1022 switch (ma->ma_attr.la_mode & S_IFMT) {
1040 * Create object and insert it into namespace.
1042 static int mdd_create(const struct lu_env *env,
1043 struct md_object *pobj, const char *name,
1044 struct md_object *child,
1045 struct md_create_spec *spec,
1048 struct mdd_device *mdd = mdo2mdd(pobj);
1049 struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1050 struct mdd_object *son = md2mdd_obj(child);
1051 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1052 struct lu_attr *attr = &ma->ma_attr;
1053 struct lov_mds_md *lmm = NULL;
1054 struct thandle *handle;
1055 int rc, created = 0, inserted = 0, lmm_size = 0;
1056 struct dynlock_handle *dlh;
1057 struct timeval start;
1060 mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
1062 * Two operations have to be performed:
1064 * - allocation of new object (->do_create()), and
1066 * - insertion into parent index (->dio_insert()).
1068 * Due to locking, operation order is not important, when both are
1069 * successful, *but* error handling cases are quite different:
1071 * - if insertion is done first, and following object creation fails,
1072 * insertion has to be rolled back, but this operation might fail
1073 * also leaving us with dangling index entry.
1075 * - if creation is done first, is has to be undone if insertion
1076 * fails, leaving us with leaked space, which is neither good, nor
1079 * It seems that creation-first is simplest solution, but it is
1080 * sub-optimal in the frequent
1085 * case, because second mkdir is bound to create object, only to
1086 * destroy it immediately.
1088 * To avoid this follow local file systems that do double lookup:
1090 * 0. lookup -> -EEXIST (mdd_create_sanity_check())
1092 * 1. create (mdd_object_create_internal())
1094 * 2. insert (__mdd_index_insert(), lookup again)
1097 /* sanity checks before big job */
1098 rc = mdd_create_sanity_check(env, pobj, name, ma);
1102 /* no RPC inside the transaction, so OST objects should be created at
1104 if (S_ISREG(attr->la_mode)) {
1105 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1111 mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1112 handle = mdd_trans_start(env, mdd);
1114 RETURN(PTR_ERR(handle));
1116 dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1118 GOTO(out_trans, rc = -ENOMEM);
1121 * XXX check that link can be added to the parent in mkdir case.
1124 mdd_write_lock(env, son);
1125 rc = mdd_object_create_internal(env, son, ma, handle);
1127 mdd_write_unlock(env, son);
1133 #ifdef CONFIG_FS_POSIX_ACL
1134 mdd_read_lock(env, mdd_pobj);
1135 rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1136 mdd_read_unlock(env, mdd_pobj);
1138 mdd_write_unlock(env, son);
1141 ma->ma_attr.la_valid |= LA_MODE;
1145 rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1147 mdd_write_unlock(env, son);
1150 * Object has no links, so it will be destroyed when last
1151 * reference is released. (XXX not now.)
1155 rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1156 name, S_ISDIR(attr->la_mode), handle,
1157 mdd_object_capa(env, mdd_pobj));
1163 /* replay creates has objects already */
1164 if (spec->u.sp_ea.no_lov_create) {
1165 CDEBUG(D_INFO, "we already have lov ea\n");
1166 LASSERT(lmm == NULL);
1167 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1168 lmm_size = spec->u.sp_ea.eadatalen;
1170 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1172 CERROR("error on stripe info copy %d \n", rc);
1175 if (lmm && lmm_size > 0) {
1176 /* set Lov here, do not get lmm again later */
1177 memcpy(ma->ma_lmm, lmm, lmm_size);
1178 ma->ma_lmm_size = lmm_size;
1179 ma->ma_valid |= MA_LOV;
1182 if (S_ISLNK(attr->la_mode)) {
1183 struct dt_object *dt = mdd_object_child(son);
1184 const char *target_name = spec->u.sp_symname;
1185 int sym_len = strlen(target_name);
1186 const struct lu_buf *buf;
1189 buf = mdd_buf_get_const(env, target_name, sym_len);
1190 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1191 mdd_object_capa(env, son));
1198 *la_copy = ma->ma_attr;
1199 la_copy->la_valid = LA_CTIME | LA_MTIME;
1200 rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
1204 /* return attr back */
1205 rc = mdd_attr_get_internal_locked(env, son, ma);
1207 if (rc && created) {
1211 rc2 = __mdd_index_delete(env, mdd_pobj, name,
1212 S_ISDIR(attr->la_mode),
1213 handle, BYPASS_CAPA);
1215 CERROR("error can not cleanup destroy %d\n",
1219 mdd_write_lock(env, son);
1220 mdd_ref_del_internal(env, son, handle);
1221 mdd_write_unlock(env, son);
1224 /* finish mdd_lov_create() stuff */
1225 mdd_lov_create_finish(env, mdd, rc);
1226 if (lmm && !spec->u.sp_ea.no_lov_create)
1227 OBD_FREE(lmm, lmm_size);
1228 mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1230 mdd_trans_stop(env, mdd, rc, handle);
1231 mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
1236 * Get locks on parents in proper order
1237 * RETURN: < 0 - error, rename_order if successful
1245 static int mdd_rename_order(const struct lu_env *env,
1246 struct mdd_device *mdd,
1247 struct mdd_object *src_pobj,
1248 struct mdd_object *tgt_pobj)
1250 /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1254 if (src_pobj == tgt_pobj)
1255 RETURN(MDD_RN_SAME);
1257 /* compared the parent child relationship of src_p&tgt_p */
1258 if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1260 } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1263 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1276 static int mdd_rename_sanity_check(const struct lu_env *env,
1277 struct mdd_object *src_pobj,
1278 struct mdd_object *tgt_pobj,
1279 const struct lu_fid *sfid,
1281 struct mdd_object *tobj)
1286 if (mdd_is_dead_obj(src_pobj))
1289 /* The sobj maybe on the remote, check parent permission only here */
1290 rc = mdd_permission_internal_locked(env, src_pobj,
1291 MAY_WRITE | MAY_EXEC);
1296 rc = mdd_may_create(env, tgt_pobj, NULL,
1297 (src_pobj != tgt_pobj));
1299 mdd_read_lock(env, tobj);
1300 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1301 (src_pobj != tgt_pobj));
1303 if (S_ISDIR(mdd_object_type(tobj))
1304 && mdd_dir_is_empty(env, tobj))
1306 mdd_read_unlock(env, tobj);
1311 /* src object can be remote that is why we use only fid and type of object */
1312 static int mdd_rename(const struct lu_env *env,
1313 struct md_object *src_pobj, struct md_object *tgt_pobj,
1314 const struct lu_fid *lf, const char *sname,
1315 struct md_object *tobj, const char *tname,
1318 struct mdd_device *mdd = mdo2mdd(src_pobj);
1319 struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1320 struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1321 struct mdd_object *mdd_sobj = NULL;
1322 struct mdd_object *mdd_tobj = NULL;
1323 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1324 struct dynlock_handle *sdlh, *tdlh;
1325 struct thandle *handle;
1330 LASSERT(ma->ma_attr.la_mode & S_IFMT);
1331 is_dir = S_ISDIR(ma->ma_attr.la_mode);
1332 if (ma->ma_attr.la_valid & LA_FLAGS &&
1333 ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1337 mdd_tobj = md2mdd_obj(tobj);
1339 mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1340 handle = mdd_trans_start(env, mdd);
1342 RETURN(PTR_ERR(handle));
1344 /* FIXME: Should consider tobj and sobj too in rename_lock. */
1345 rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1347 GOTO(cleanup_unlocked, rc);
1349 /* get locks in determined order */
1350 if (rc == MDD_RN_SAME) {
1351 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1352 /* check hashes to determine do we need one lock or two */
1353 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1354 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1357 } else if (rc == MDD_RN_SRCTGT) {
1358 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1359 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1361 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1362 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1364 if (sdlh == NULL || tdlh == NULL)
1365 GOTO(cleanup, rc = -ENOMEM);
1367 rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1368 lf, is_dir, mdd_tobj);
1372 rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1373 mdd_object_capa(env, mdd_spobj));
1378 * Here tobj can be remote one, so we do index_delete unconditionally
1379 * and -ENOENT is allowed.
1381 rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1382 mdd_object_capa(env, mdd_tpobj));
1383 if (rc != 0 && rc != -ENOENT)
1386 rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1387 mdd_object_capa(env, mdd_tpobj));
1391 mdd_sobj = mdd_object_find(env, mdd, lf);
1392 *la_copy = ma->ma_attr;
1393 la_copy->la_valid = LA_CTIME;
1395 /*XXX: how to update ctime for remote sobj? */
1396 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy,
1401 if (tobj && lu_object_exists(&tobj->mo_lu)) {
1402 mdd_write_lock(env, mdd_tobj);
1403 mdd_ref_del_internal(env, mdd_tobj, handle);
1404 /* remove dot reference */
1406 mdd_ref_del_internal(env, mdd_tobj, handle);
1408 la_copy->la_valid = LA_CTIME;
1409 rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
1413 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1414 mdd_write_unlock(env, mdd_tobj);
1419 la_copy->la_valid = LA_CTIME | LA_MTIME;
1420 rc = mdd_attr_set_internal_locked(env, mdd_spobj, la_copy, handle, 0);
1424 if (mdd_spobj != mdd_tpobj) {
1425 la_copy->la_valid = LA_CTIME | LA_MTIME;
1426 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la_copy,
1431 if (likely(tdlh) && sdlh != tdlh)
1432 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1434 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1436 mdd_trans_stop(env, mdd, rc, handle);
1438 mdd_object_put(env, mdd_sobj);
1442 struct md_dir_operations mdd_dir_ops = {
1443 .mdo_is_subdir = mdd_is_subdir,
1444 .mdo_lookup = mdd_lookup,
1445 .mdo_create = mdd_create,
1446 .mdo_rename = mdd_rename,
1447 .mdo_link = mdd_link,
1448 .mdo_unlink = mdd_unlink,
1449 .mdo_name_insert = mdd_name_insert,
1450 .mdo_name_remove = mdd_name_remove,
1451 .mdo_rename_tgt = mdd_rename_tgt,
1452 .mdo_create_data = mdd_create_data