1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
130 OBD_VFREE(buf->lb_buf, buf->lb_len);
132 OBD_FREE(buf->lb_buf, buf->lb_len);
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137 const void *area, ssize_t len)
141 buf = &mdd_env_info(env)->mti_buf;
142 buf->lb_buf = (void *)area;
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154 OBD_VFREE(buf->lb_buf, buf->lb_len);
156 OBD_FREE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL) {
161 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162 OBD_ALLOC(buf->lb_buf, buf->lb_len);
165 if (buf->lb_buf == NULL) {
166 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
169 if (buf->lb_buf == NULL)
175 /** Increase the size of the \a mti_big_buf.
176 * preserves old data in buffer
177 * old buffer remains unchanged on error
178 * \retval 0 or -ENOMEM
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
185 LASSERT(len >= oldbuf->lb_len);
186 if (len > BUF_VMALLOC_SIZE) {
187 OBD_VMALLOC(buf.lb_buf, len);
190 OBD_ALLOC(buf.lb_buf, len);
193 if (buf.lb_buf == NULL)
197 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199 if (oldbuf->lb_vmalloc)
200 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204 memcpy(oldbuf, &buf, sizeof(buf));
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210 struct mdd_device *mdd)
212 struct mdd_thread_info *mti = mdd_env_info(env);
215 max_cookie_size = mdd_lov_cookiesize(env, mdd);
216 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217 if (mti->mti_max_cookie)
218 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219 mti->mti_max_cookie = NULL;
220 mti->mti_max_cookie_size = 0;
222 if (unlikely(mti->mti_max_cookie == NULL)) {
223 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224 if (likely(mti->mti_max_cookie != NULL))
225 mti->mti_max_cookie_size = max_cookie_size;
227 if (likely(mti->mti_max_cookie != NULL))
228 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229 return mti->mti_max_cookie;
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233 struct mdd_device *mdd)
235 struct mdd_thread_info *mti = mdd_env_info(env);
238 max_lmm_size = mdd_lov_mdsize(env, mdd);
239 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240 if (mti->mti_max_lmm)
241 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242 mti->mti_max_lmm = NULL;
243 mti->mti_max_lmm_size = 0;
245 if (unlikely(mti->mti_max_lmm == NULL)) {
246 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247 if (unlikely(mti->mti_max_lmm != NULL))
248 mti->mti_max_lmm_size = max_lmm_size;
250 return mti->mti_max_lmm;
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254 const struct lu_object_header *hdr,
257 struct mdd_object *mdd_obj;
259 OBD_ALLOC_PTR(mdd_obj);
260 if (mdd_obj != NULL) {
263 o = mdd2lu_obj(mdd_obj);
264 lu_object_init(o, NULL, d);
265 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267 mdd_obj->mod_count = 0;
268 o->lo_ops = &mdd_lu_obj_ops;
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276 const struct lu_object_conf *unused)
278 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279 struct mdd_object *mdd_obj = lu2mdd_obj(o);
280 struct lu_object *below;
281 struct lu_device *under;
284 mdd_obj->mod_cltime = 0;
285 under = &d->mdd_child->dd_lu_dev;
286 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287 mdd_pdlock_init(mdd_obj);
291 lu_object_add(o, below);
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 if (lu_object_exists(o))
299 return mdd_get_flags(env, lu2mdd_obj(o));
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 struct mdd_object *mdd = lu2mdd_obj(o);
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313 lu_printer_t p, const struct lu_object *o)
315 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317 "valid=%x, cltime=%llu, flags=%lx)",
318 mdd, mdd->mod_count, mdd->mod_valid,
319 mdd->mod_cltime, mdd->mod_flags);
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323 .loo_object_init = mdd_object_init,
324 .loo_object_start = mdd_object_start,
325 .loo_object_free = mdd_object_free,
326 .loo_object_print = mdd_object_print,
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330 struct mdd_device *d,
331 const struct lu_fid *f)
333 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337 const char *path, struct lu_fid *fid)
340 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341 struct mdd_object *obj;
342 struct lu_name *lname = &mdd_env_info(env)->mti_name;
347 /* temp buffer for path element */
348 buf = mdd_buf_alloc(env, PATH_MAX);
349 if (buf->lb_buf == NULL)
352 lname->ln_name = name = buf->lb_buf;
353 lname->ln_namelen = 0;
354 *f = mdd->mdd_root_fid;
361 while (*path != '/' && *path != '\0') {
369 /* find obj corresponding to fid */
370 obj = mdd_object_find(env, mdd, f);
372 GOTO(out, rc = -EREMOTE);
374 GOTO(out, rc = -PTR_ERR(obj));
375 /* get child fid from parent and name */
376 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377 mdd_object_put(env, obj);
382 lname->ln_namelen = 0;
391 /** The maximum depth that fid2path() will search.
392 * This is limited only because we want to store the fids for
393 * historical path lookup purposes.
395 #define MAX_PATH_DEPTH 100
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399 __u64 pli_recno; /**< history point */
400 __u64 pli_currec; /**< current record */
401 struct lu_fid pli_fid;
402 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403 struct mdd_object *pli_mdd_obj;
404 char *pli_path; /**< full path */
406 int pli_linkno; /**< which hardlink to follow */
407 int pli_fidcount; /**< number of \a pli_fids */
410 static int mdd_path_current(const struct lu_env *env,
411 struct path_lookup_info *pli)
413 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414 struct mdd_object *mdd_obj;
415 struct lu_buf *buf = NULL;
416 struct link_ea_header *leh;
417 struct link_ea_entry *lee;
418 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
425 ptr = pli->pli_path + pli->pli_pathlen - 1;
428 pli->pli_fidcount = 0;
429 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432 mdd_obj = mdd_object_find(env, mdd,
433 &pli->pli_fids[pli->pli_fidcount]);
435 GOTO(out, rc = -EREMOTE);
437 GOTO(out, rc = -PTR_ERR(mdd_obj));
438 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440 mdd_object_put(env, mdd_obj);
444 /* Do I need to error out here? */
449 /* Get parent fid and object name */
450 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451 buf = mdd_links_get(env, mdd_obj);
452 mdd_read_unlock(env, mdd_obj);
453 mdd_object_put(env, mdd_obj);
455 GOTO(out, rc = PTR_ERR(buf));
458 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461 /* If set, use link #linkno for path lookup, otherwise use
462 link #0. Only do this for the final path element. */
463 if ((pli->pli_fidcount == 0) &&
464 (pli->pli_linkno < leh->leh_reccount)) {
466 for (count = 0; count < pli->pli_linkno; count++) {
467 lee = (struct link_ea_entry *)
468 ((char *)lee + reclen);
469 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471 if (pli->pli_linkno < leh->leh_reccount - 1)
472 /* indicate to user there are more links */
476 /* Pack the name in the end of the buffer */
477 ptr -= tmpname->ln_namelen;
478 if (ptr - 1 <= pli->pli_path)
479 GOTO(out, rc = -EOVERFLOW);
480 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
483 /* Store the parent fid for historic lookup */
484 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485 GOTO(out, rc = -EOVERFLOW);
486 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
489 /* Verify that our path hasn't changed since we started the lookup.
490 Record the current index, and verify the path resolves to the
491 same fid. If it does, then the path is correct as of this index. */
492 spin_lock(&mdd->mdd_cl.mc_lock);
493 pli->pli_currec = mdd->mdd_cl.mc_index;
494 spin_unlock(&mdd->mdd_cl.mc_lock);
495 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498 GOTO (out, rc = -EAGAIN);
500 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503 PFID(&pli->pli_fid));
504 GOTO(out, rc = -EAGAIN);
507 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
511 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512 /* if we vmalloced a large buffer drop it */
518 static int mdd_path_historic(const struct lu_env *env,
519 struct path_lookup_info *pli)
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526 char *path, int pathlen, __u64 *recno, int *linkno)
528 struct path_lookup_info *pli;
536 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
546 pli->pli_mdd_obj = md2mdd_obj(obj);
547 pli->pli_recno = *recno;
548 pli->pli_path = path;
549 pli->pli_pathlen = pathlen;
550 pli->pli_linkno = *linkno;
552 /* Retry multiple times in case file is being moved */
553 while (tries-- && rc == -EAGAIN)
554 rc = mdd_path_current(env, pli);
556 /* For historical path lookup, the current links may not have existed
557 * at "recno" time. We must switch over to earlier links/parents
558 * by using the changelog records. If the earlier parent doesn't
559 * exist, we must search back through the changelog to reconstruct
560 * its parents, then check if it exists, etc.
561 * We may ignore this problem for the initial implementation and
562 * state that an "original" hardlink must still exist for us to find
563 * historic path name. */
564 if (pli->pli_recno != -1) {
565 rc = mdd_path_historic(env, pli);
567 *recno = pli->pli_currec;
568 /* Return next link index to caller */
569 *linkno = pli->pli_linkno;
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
579 struct lu_attr *la = &mdd_env_info(env)->mti_la;
583 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
585 mdd_flags_xlate(obj, la->la_flags);
586 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587 obj->mod_flags |= MNLINK_OBJ;
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
599 if (ma->ma_valid & MA_INODE)
602 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603 mdd_object_capa(env, mdd_obj));
605 ma->ma_valid |= MA_INODE;
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
612 struct lov_desc *ldesc;
613 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
616 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617 LASSERT(ldesc != NULL);
622 lmm->lmm_magic = LOV_MAGIC_V1;
623 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624 lmm->lmm_pattern = ldesc->ld_pattern;
625 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627 *size = sizeof(struct lov_mds_md);
629 RETURN(sizeof(struct lov_mds_md));
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634 struct mdd_object *mdd_obj, struct md_attr *ma)
639 if (ma->ma_valid & MA_LOV)
642 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
645 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
651 ma->ma_valid |= MA_LOV;
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
663 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664 rc = __mdd_lmm_get(env, mdd_obj, ma);
665 mdd_read_unlock(env, mdd_obj);
670 static int __mdd_lmv_get(const struct lu_env *env,
671 struct mdd_object *mdd_obj, struct md_attr *ma)
676 if (ma->ma_valid & MA_LMV)
679 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
682 ma->ma_valid |= MA_LMV;
688 static int mdd_attr_get_internal(const struct lu_env *env,
689 struct mdd_object *mdd_obj,
695 if (ma->ma_need & MA_INODE)
696 rc = mdd_iattr_get(env, mdd_obj, ma);
698 if (rc == 0 && ma->ma_need & MA_LOV) {
699 if (S_ISREG(mdd_object_type(mdd_obj)) ||
700 S_ISDIR(mdd_object_type(mdd_obj)))
701 rc = __mdd_lmm_get(env, mdd_obj, ma);
703 if (rc == 0 && ma->ma_need & MA_LMV) {
704 if (S_ISDIR(mdd_object_type(mdd_obj)))
705 rc = __mdd_lmv_get(env, mdd_obj, ma);
707 #ifdef CONFIG_FS_POSIX_ACL
708 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
709 if (S_ISDIR(mdd_object_type(mdd_obj)))
710 rc = mdd_def_acl_get(env, mdd_obj, ma);
713 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
718 int mdd_attr_get_internal_locked(const struct lu_env *env,
719 struct mdd_object *mdd_obj, struct md_attr *ma)
722 int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
725 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
726 rc = mdd_attr_get_internal(env, mdd_obj, ma);
728 mdd_read_unlock(env, mdd_obj);
733 * No permission check is needed.
735 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
738 struct mdd_object *mdd_obj = md2mdd_obj(obj);
742 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
747 * No permission check is needed.
749 static int mdd_xattr_get(const struct lu_env *env,
750 struct md_object *obj, struct lu_buf *buf,
753 struct mdd_object *mdd_obj = md2mdd_obj(obj);
758 LASSERT(mdd_object_exists(mdd_obj));
760 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
761 rc = mdo_xattr_get(env, mdd_obj, buf, name,
762 mdd_object_capa(env, mdd_obj));
763 mdd_read_unlock(env, mdd_obj);
769 * Permission check is done when open,
770 * no need check again.
772 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
775 struct mdd_object *mdd_obj = md2mdd_obj(obj);
776 struct dt_object *next;
781 LASSERT(mdd_object_exists(mdd_obj));
783 next = mdd_object_child(mdd_obj);
784 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
785 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
786 mdd_object_capa(env, mdd_obj));
787 mdd_read_unlock(env, mdd_obj);
792 * No permission check is needed.
794 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
797 struct mdd_object *mdd_obj = md2mdd_obj(obj);
802 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
803 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
804 mdd_read_unlock(env, mdd_obj);
809 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
810 struct mdd_object *c, struct md_attr *ma,
811 struct thandle *handle,
812 const struct md_op_spec *spec)
814 struct lu_attr *attr = &ma->ma_attr;
815 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
816 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
817 const struct dt_index_features *feat = spec->sp_feat;
821 if (!mdd_object_exists(c)) {
822 struct dt_object *next = mdd_object_child(c);
825 if (feat != &dt_directory_features && feat != NULL)
826 dof->dof_type = DFT_INDEX;
828 dof->dof_type = dt_mode_to_dft(attr->la_mode);
830 dof->u.dof_idx.di_feat = feat;
832 /* @hint will be initialized by underlying device. */
833 next->do_ops->do_ah_init(env, hint,
834 p ? mdd_object_child(p) : NULL,
835 attr->la_mode & S_IFMT);
837 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
838 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
846 * Make sure the ctime is increased only.
848 static inline int mdd_attr_check(const struct lu_env *env,
849 struct mdd_object *obj,
850 struct lu_attr *attr)
852 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
856 if (attr->la_valid & LA_CTIME) {
857 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
861 if (attr->la_ctime < tmp_la->la_ctime)
862 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
863 else if (attr->la_valid == LA_CTIME &&
864 attr->la_ctime == tmp_la->la_ctime)
865 attr->la_valid &= ~LA_CTIME;
870 int mdd_attr_set_internal(const struct lu_env *env,
871 struct mdd_object *obj,
872 struct lu_attr *attr,
873 struct thandle *handle,
879 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
880 #ifdef CONFIG_FS_POSIX_ACL
881 if (!rc && (attr->la_valid & LA_MODE) && needacl)
882 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
887 int mdd_attr_check_set_internal(const struct lu_env *env,
888 struct mdd_object *obj,
889 struct lu_attr *attr,
890 struct thandle *handle,
896 rc = mdd_attr_check(env, obj, attr);
901 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
905 static int mdd_attr_set_internal_locked(const struct lu_env *env,
906 struct mdd_object *obj,
907 struct lu_attr *attr,
908 struct thandle *handle,
914 needacl = needacl && (attr->la_valid & LA_MODE);
916 mdd_write_lock(env, obj, MOR_TGT_CHILD);
917 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
919 mdd_write_unlock(env, obj);
923 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
924 struct mdd_object *obj,
925 struct lu_attr *attr,
926 struct thandle *handle,
932 needacl = needacl && (attr->la_valid & LA_MODE);
934 mdd_write_lock(env, obj, MOR_TGT_CHILD);
935 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
937 mdd_write_unlock(env, obj);
941 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
942 const struct lu_buf *buf, const char *name,
943 int fl, struct thandle *handle)
945 struct lustre_capa *capa = mdd_object_capa(env, obj);
949 if (buf->lb_buf && buf->lb_len > 0)
950 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
951 else if (buf->lb_buf == NULL && buf->lb_len == 0)
952 rc = mdo_xattr_del(env, obj, name, handle, capa);
958 * This gives the same functionality as the code between
959 * sys_chmod and inode_setattr
960 * chown_common and inode_setattr
961 * utimes and inode_setattr
962 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
964 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
965 struct lu_attr *la, const struct md_attr *ma)
967 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
968 struct md_ucred *uc = md_ucred(env);
975 /* Do not permit change file type */
976 if (la->la_valid & LA_TYPE)
979 /* They should not be processed by setattr */
980 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
983 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
987 if (la->la_valid == LA_CTIME) {
988 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
989 /* This is only for set ctime when rename's source is
991 rc = mdd_may_delete(env, NULL, obj,
992 (struct md_attr *)ma, 1, 0);
993 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
994 la->la_valid &= ~LA_CTIME;
998 if (la->la_valid == LA_ATIME) {
999 /* This is atime only set for read atime update on close. */
1000 if (la->la_atime <= tmp_la->la_atime +
1001 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1002 la->la_valid &= ~LA_ATIME;
1006 /* Check if flags change. */
1007 if (la->la_valid & LA_FLAGS) {
1008 unsigned int oldflags = 0;
1009 unsigned int newflags = la->la_flags &
1010 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1012 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1013 !mdd_capable(uc, CFS_CAP_FOWNER))
1016 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1017 * only be changed by the relevant capability. */
1018 if (mdd_is_immutable(obj))
1019 oldflags |= LUSTRE_IMMUTABLE_FL;
1020 if (mdd_is_append(obj))
1021 oldflags |= LUSTRE_APPEND_FL;
1022 if ((oldflags ^ newflags) &&
1023 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1026 if (!S_ISDIR(tmp_la->la_mode))
1027 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1030 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1031 (la->la_valid & ~LA_FLAGS) &&
1032 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1035 /* Check for setting the obj time. */
1036 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1037 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1038 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1039 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1040 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1048 /* Make sure a caller can chmod. */
1049 if (la->la_valid & LA_MODE) {
1050 /* Bypass la_vaild == LA_MODE,
1051 * this is for changing file with SUID or SGID. */
1052 if ((la->la_valid & ~LA_MODE) &&
1053 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1054 (uc->mu_fsuid != tmp_la->la_uid) &&
1055 !mdd_capable(uc, CFS_CAP_FOWNER))
1058 if (la->la_mode == (umode_t) -1)
1059 la->la_mode = tmp_la->la_mode;
1061 la->la_mode = (la->la_mode & S_IALLUGO) |
1062 (tmp_la->la_mode & ~S_IALLUGO);
1064 /* Also check the setgid bit! */
1065 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1066 la->la_gid : tmp_la->la_gid) &&
1067 !mdd_capable(uc, CFS_CAP_FSETID))
1068 la->la_mode &= ~S_ISGID;
1070 la->la_mode = tmp_la->la_mode;
1073 /* Make sure a caller can chown. */
1074 if (la->la_valid & LA_UID) {
1075 if (la->la_uid == (uid_t) -1)
1076 la->la_uid = tmp_la->la_uid;
1077 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1078 (la->la_uid != tmp_la->la_uid)) &&
1079 !mdd_capable(uc, CFS_CAP_CHOWN))
1082 /* If the user or group of a non-directory has been
1083 * changed by a non-root user, remove the setuid bit.
1084 * 19981026 David C Niemi <niemi@tux.org>
1086 * Changed this to apply to all users, including root,
1087 * to avoid some races. This is the behavior we had in
1088 * 2.0. The check for non-root was definitely wrong
1089 * for 2.2 anyway, as it should have been using
1090 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1091 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1092 !S_ISDIR(tmp_la->la_mode)) {
1093 la->la_mode &= ~S_ISUID;
1094 la->la_valid |= LA_MODE;
1098 /* Make sure caller can chgrp. */
1099 if (la->la_valid & LA_GID) {
1100 if (la->la_gid == (gid_t) -1)
1101 la->la_gid = tmp_la->la_gid;
1102 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1103 ((la->la_gid != tmp_la->la_gid) &&
1104 !lustre_in_group_p(uc, la->la_gid))) &&
1105 !mdd_capable(uc, CFS_CAP_CHOWN))
1108 /* Likewise, if the user or group of a non-directory
1109 * has been changed by a non-root user, remove the
1110 * setgid bit UNLESS there is no group execute bit
1111 * (this would be a file marked for mandatory
1112 * locking). 19981026 David C Niemi <niemi@tux.org>
1114 * Removed the fsuid check (see the comment above) --
1116 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1117 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1118 la->la_mode &= ~S_ISGID;
1119 la->la_valid |= LA_MODE;
1123 /* For both Size-on-MDS case and truncate case,
1124 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1125 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1126 * For SOM case, it is true, the MAY_WRITE perm has been checked
1127 * when open, no need check again. For truncate case, it is false,
1128 * the MAY_WRITE perm should be checked here. */
1129 if (ma->ma_attr_flags & MDS_SOM) {
1130 /* For the "Size-on-MDS" setattr update, merge coming
1131 * attributes with the set in the inode. BUG 10641 */
1132 if ((la->la_valid & LA_ATIME) &&
1133 (la->la_atime <= tmp_la->la_atime))
1134 la->la_valid &= ~LA_ATIME;
1136 /* OST attributes do not have a priority over MDS attributes,
1137 * so drop times if ctime is equal. */
1138 if ((la->la_valid & LA_CTIME) &&
1139 (la->la_ctime <= tmp_la->la_ctime))
1140 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1142 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1143 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1144 (uc->mu_fsuid == tmp_la->la_uid)) &&
1145 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1146 rc = mdd_permission_internal_locked(env, obj,
1153 if (la->la_valid & LA_CTIME) {
1154 /* The pure setattr, it has the priority over what is
1155 * already set, do not drop it if ctime is equal. */
1156 if (la->la_ctime < tmp_la->la_ctime)
1157 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1165 /** Store a data change changelog record
1166 * If this fails, we must fail the whole transaction; we don't
1167 * want the change to commit without the log entry.
1168 * \param mdd_obj - mdd_object of change
1169 * \param handle - transacion handle
1171 static int mdd_changelog_data_store(const struct lu_env *env,
1172 struct mdd_device *mdd,
1173 enum changelog_rec_type type,
1174 struct mdd_object *mdd_obj,
1175 struct thandle *handle)
1177 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1178 struct llog_changelog_rec *rec;
1183 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1186 LASSERT(handle != NULL);
1187 LASSERT(mdd_obj != NULL);
1189 if ((type == CL_SETATTR) &&
1190 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1191 /* Don't need multiple updates in this log */
1192 /* Don't check under lock - no big deal if we get an extra
1197 reclen = llog_data_len(sizeof(*rec));
1198 buf = mdd_buf_alloc(env, reclen);
1199 if (buf->lb_buf == NULL)
1201 rec = (struct llog_changelog_rec *)buf->lb_buf;
1203 rec->cr.cr_flags = CLF_VERSION;
1204 rec->cr.cr_type = (__u32)type;
1205 rec->cr.cr_tfid = *tfid;
1206 rec->cr.cr_namelen = 0;
1207 mdd_obj->mod_cltime = cfs_time_current_64();
1209 rc = mdd_changelog_llog_write(mdd, rec, handle);
1211 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1212 rc, type, PFID(tfid));
1219 /* set attr and LOV EA at once, return updated attr */
1220 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1221 const struct md_attr *ma)
1223 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1224 struct mdd_device *mdd = mdo2mdd(obj);
1225 struct thandle *handle;
1226 struct lov_mds_md *lmm = NULL;
1227 struct llog_cookie *logcookies = NULL;
1228 int rc, lmm_size = 0, cookie_size = 0;
1229 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1230 #ifdef HAVE_QUOTA_SUPPORT
1231 struct obd_device *obd = mdd->mdd_obd_dev;
1232 struct obd_export *exp = md_quota(env)->mq_exp;
1233 struct mds_obd *mds = &obd->u.mds;
1234 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1235 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1236 int quota_opc = 0, block_count = 0;
1237 int inode_pending[MAXQUOTAS] = { 0, 0 };
1238 int block_pending[MAXQUOTAS] = { 0, 0 };
1242 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1243 MDD_TXN_ATTR_SET_OP);
1244 handle = mdd_trans_start(env, mdd);
1246 RETURN(PTR_ERR(handle));
1247 /*TODO: add lock here*/
1248 /* start a log jounal handle if needed */
1249 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1250 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1251 lmm_size = mdd_lov_mdsize(env, mdd);
1252 lmm = mdd_max_lmm_get(env, mdd);
1254 GOTO(cleanup, rc = -ENOMEM);
1256 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1263 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1264 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1265 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1267 *la_copy = ma->ma_attr;
1268 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1272 #ifdef HAVE_QUOTA_SUPPORT
1273 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1274 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1276 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1278 quota_opc = FSFILT_OP_SETATTR;
1279 mdd_quota_wrapper(la_copy, qnids);
1280 mdd_quota_wrapper(la_tmp, qoids);
1281 /* get file quota for new owner */
1282 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1283 qnids, inode_pending, 1, NULL, 0,
1285 block_count = (la_tmp->la_blocks + 7) >> 3;
1288 mdd_data_get(env, mdd_obj, &data);
1289 /* get block quota for new owner */
1290 lquota_chkquota(mds_quota_interface_ref, obd,
1291 exp, qnids, block_pending,
1293 LQUOTA_FLAGS_BLK, data, 1);
1299 if (la_copy->la_valid & LA_FLAGS) {
1300 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1303 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1304 } else if (la_copy->la_valid) { /* setattr */
1305 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1307 /* journal chown/chgrp in llog, just like unlink */
1308 if (rc == 0 && lmm_size){
1309 cookie_size = mdd_lov_cookiesize(env, mdd);
1310 logcookies = mdd_max_cookie_get(env, mdd);
1311 if (logcookies == NULL)
1312 GOTO(cleanup, rc = -ENOMEM);
1314 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1315 logcookies, cookie_size) <= 0)
1320 if (rc == 0 && ma->ma_valid & MA_LOV) {
1323 mode = mdd_object_type(mdd_obj);
1324 if (S_ISREG(mode) || S_ISDIR(mode)) {
1325 rc = mdd_lsm_sanity_check(env, mdd_obj);
1329 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1330 ma->ma_lmm_size, handle, 1);
1335 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1336 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1338 mdd_trans_stop(env, mdd, rc, handle);
1339 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1340 /*set obd attr, if needed*/
1341 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1344 #ifdef HAVE_QUOTA_SUPPORT
1346 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1348 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1350 /* Trigger dqrel/dqacq for original owner and new owner.
1351 * If failed, the next call for lquota_chkquota will
1353 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1360 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1361 const struct lu_buf *buf, const char *name, int fl,
1362 struct thandle *handle)
1367 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1368 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1369 mdd_write_unlock(env, obj);
1374 static int mdd_xattr_sanity_check(const struct lu_env *env,
1375 struct mdd_object *obj)
1377 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1378 struct md_ucred *uc = md_ucred(env);
1382 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1385 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1389 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1390 !mdd_capable(uc, CFS_CAP_FOWNER))
1397 * The caller should guarantee to update the object ctime
1398 * after xattr_set if needed.
1400 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1401 const struct lu_buf *buf, const char *name,
1404 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1405 struct mdd_device *mdd = mdo2mdd(obj);
1406 struct thandle *handle;
1410 rc = mdd_xattr_sanity_check(env, mdd_obj);
1414 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1415 /* security-replated changes may require sync */
1416 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1417 mdd->mdd_sync_permission == 1)
1418 txn_param_sync(&mdd_env_info(env)->mti_param);
1420 handle = mdd_trans_start(env, mdd);
1422 RETURN(PTR_ERR(handle));
1424 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1426 /* Only record user xattr changes */
1427 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1428 (strncmp("user.", name, 5) == 0))
1429 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1431 mdd_trans_stop(env, mdd, rc, handle);
1437 * The caller should guarantee to update the object ctime
1438 * after xattr_set if needed.
1440 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1443 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1444 struct mdd_device *mdd = mdo2mdd(obj);
1445 struct thandle *handle;
1449 rc = mdd_xattr_sanity_check(env, mdd_obj);
1453 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1454 handle = mdd_trans_start(env, mdd);
1456 RETURN(PTR_ERR(handle));
1458 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1459 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1460 mdd_object_capa(env, mdd_obj));
1461 mdd_write_unlock(env, mdd_obj);
1463 /* Only record user xattr changes */
1464 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1465 (strncmp("user.", name, 5) != 0))
1466 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1469 mdd_trans_stop(env, mdd, rc, handle);
1474 /* partial unlink */
1475 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1478 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1479 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1480 struct mdd_device *mdd = mdo2mdd(obj);
1481 struct thandle *handle;
1482 #ifdef HAVE_QUOTA_SUPPORT
1483 struct obd_device *obd = mdd->mdd_obd_dev;
1484 struct mds_obd *mds = &obd->u.mds;
1485 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1492 * Check -ENOENT early here because we need to get object type
1493 * to calculate credits before transaction start
1495 if (!mdd_object_exists(mdd_obj))
1498 LASSERT(mdd_object_exists(mdd_obj) > 0);
1500 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1504 handle = mdd_trans_start(env, mdd);
1508 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1510 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1514 __mdd_ref_del(env, mdd_obj, handle, 0);
1516 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1518 __mdd_ref_del(env, mdd_obj, handle, 1);
1521 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1522 la_copy->la_ctime = ma->ma_attr.la_ctime;
1524 la_copy->la_valid = LA_CTIME;
1525 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1529 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1530 #ifdef HAVE_QUOTA_SUPPORT
1531 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1532 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1533 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1534 mdd_quota_wrapper(&ma->ma_attr, qids);
1541 mdd_write_unlock(env, mdd_obj);
1542 mdd_trans_stop(env, mdd, rc, handle);
1543 #ifdef HAVE_QUOTA_SUPPORT
1545 /* Trigger dqrel on the owner of child. If failed,
1546 * the next call for lquota_chkquota will process it */
1547 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1553 /* partial operation */
1554 static int mdd_oc_sanity_check(const struct lu_env *env,
1555 struct mdd_object *obj,
1561 switch (ma->ma_attr.la_mode & S_IFMT) {
1578 static int mdd_object_create(const struct lu_env *env,
1579 struct md_object *obj,
1580 const struct md_op_spec *spec,
1584 struct mdd_device *mdd = mdo2mdd(obj);
1585 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1586 const struct lu_fid *pfid = spec->u.sp_pfid;
1587 struct thandle *handle;
1588 #ifdef HAVE_QUOTA_SUPPORT
1589 struct obd_device *obd = mdd->mdd_obd_dev;
1590 struct obd_export *exp = md_quota(env)->mq_exp;
1591 struct mds_obd *mds = &obd->u.mds;
1592 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1593 int quota_opc = 0, block_count = 0;
1594 int inode_pending[MAXQUOTAS] = { 0, 0 };
1595 int block_pending[MAXQUOTAS] = { 0, 0 };
1600 #ifdef HAVE_QUOTA_SUPPORT
1601 if (mds->mds_quota) {
1602 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1603 mdd_quota_wrapper(&ma->ma_attr, qids);
1604 /* get file quota for child */
1605 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1606 qids, inode_pending, 1, NULL, 0,
1608 switch (ma->ma_attr.la_mode & S_IFMT) {
1617 /* get block quota for child */
1619 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1620 qids, block_pending, block_count,
1621 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1625 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1626 handle = mdd_trans_start(env, mdd);
1628 GOTO(out_pending, rc = PTR_ERR(handle));
1630 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1631 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1635 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1639 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1640 /* If creating the slave object, set slave EA here. */
1641 int lmv_size = spec->u.sp_ea.eadatalen;
1642 struct lmv_stripe_md *lmv;
1644 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1645 LASSERT(lmv != NULL && lmv_size > 0);
1647 rc = __mdd_xattr_set(env, mdd_obj,
1648 mdd_buf_get_const(env, lmv, lmv_size),
1649 XATTR_NAME_LMV, 0, handle);
1653 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1656 #ifdef CONFIG_FS_POSIX_ACL
1657 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1658 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1660 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1661 buf->lb_len = spec->u.sp_ea.eadatalen;
1662 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1663 rc = __mdd_acl_init(env, mdd_obj, buf,
1664 &ma->ma_attr.la_mode,
1669 ma->ma_attr.la_valid |= LA_MODE;
1672 pfid = spec->u.sp_ea.fid;
1675 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1681 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1682 mdd_write_unlock(env, mdd_obj);
1684 mdd_trans_stop(env, mdd, rc, handle);
1686 #ifdef HAVE_QUOTA_SUPPORT
1688 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1690 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1692 /* Trigger dqacq on the owner of child. If failed,
1693 * the next call for lquota_chkquota will process it. */
1694 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1702 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1703 const struct md_attr *ma)
1705 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1706 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1707 struct mdd_device *mdd = mdo2mdd(obj);
1708 struct thandle *handle;
1712 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1713 handle = mdd_trans_start(env, mdd);
1717 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1718 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1720 __mdd_ref_add(env, mdd_obj, handle);
1721 mdd_write_unlock(env, mdd_obj);
1723 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1724 la_copy->la_ctime = ma->ma_attr.la_ctime;
1726 la_copy->la_valid = LA_CTIME;
1727 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1730 mdd_trans_stop(env, mdd, 0, handle);
1736 * do NOT or the MAY_*'s, you'll get the weakest
1738 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1742 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1743 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1744 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1745 * owner can write to a file even if it is marked readonly to hide
1746 * its brokenness. (bug 5781) */
1747 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1748 struct md_ucred *uc = md_ucred(env);
1750 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1751 (la->la_uid == uc->mu_fsuid))
1755 if (flags & FMODE_READ)
1757 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1759 if (flags & MDS_FMODE_EXEC)
1764 static int mdd_open_sanity_check(const struct lu_env *env,
1765 struct mdd_object *obj, int flag)
1767 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1772 if (mdd_is_dead_obj(obj))
1775 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1779 if (S_ISLNK(tmp_la->la_mode))
1782 mode = accmode(env, tmp_la, flag);
1784 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1787 if (!(flag & MDS_OPEN_CREATED)) {
1788 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1793 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1794 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1795 flag &= ~MDS_OPEN_TRUNC;
1797 /* For writing append-only file must open it with append mode. */
1798 if (mdd_is_append(obj)) {
1799 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1801 if (flag & MDS_OPEN_TRUNC)
1807 * Now, flag -- O_NOATIME does not be packed by client.
1809 if (flag & O_NOATIME) {
1810 struct md_ucred *uc = md_ucred(env);
1812 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1813 (uc->mu_valid == UCRED_NEW)) &&
1814 (uc->mu_fsuid != tmp_la->la_uid) &&
1815 !mdd_capable(uc, CFS_CAP_FOWNER))
1823 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1826 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1829 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1831 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1833 mdd_obj->mod_count++;
1835 mdd_write_unlock(env, mdd_obj);
1839 /* return md_attr back,
1840 * if it is last unlink then return lov ea + llog cookie*/
1841 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1847 if (S_ISREG(mdd_object_type(obj))) {
1848 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1849 * Caller must be ready for that. */
1851 rc = __mdd_lmm_get(env, obj, ma);
1852 if ((ma->ma_valid & MA_LOV))
1853 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1860 * No permission check is needed.
1862 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1865 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1866 struct mdd_device *mdd = mdo2mdd(obj);
1867 struct thandle *handle;
1871 #ifdef HAVE_QUOTA_SUPPORT
1872 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1873 struct mds_obd *mds = &obd->u.mds;
1874 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1879 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1882 handle = mdd_trans_start(env, mdo2mdd(obj));
1884 RETURN(PTR_ERR(handle));
1886 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1887 /* release open count */
1888 mdd_obj->mod_count --;
1890 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1891 /* remove link to object from orphan index */
1892 rc = __mdd_orphan_del(env, mdd_obj, handle);
1894 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1895 "list, OSS objects to be destroyed.\n",
1896 PFID(mdd_object_fid(mdd_obj)));
1898 CERROR("Object "DFID" can not be deleted from orphan "
1899 "list, maybe cause OST objects can not be "
1900 "destroyed (err: %d).\n",
1901 PFID(mdd_object_fid(mdd_obj)), rc);
1902 /* If object was not deleted from orphan list, do not
1903 * destroy OSS objects, which will be done when next
1909 rc = mdd_iattr_get(env, mdd_obj, ma);
1910 /* Object maybe not in orphan list originally, it is rare case for
1911 * mdd_finish_unlink() failure. */
1912 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1913 #ifdef HAVE_QUOTA_SUPPORT
1914 if (mds->mds_quota) {
1915 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1916 mdd_quota_wrapper(&ma->ma_attr, qids);
1919 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
1920 if (ma->ma_valid & MA_FLAGS &&
1921 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
1922 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
1924 rc = mdd_object_kill(env, mdd_obj, ma);
1930 CERROR("Error when prepare to delete Object "DFID" , "
1931 "which will cause OST objects can not be "
1932 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
1938 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1940 mdd_write_unlock(env, mdd_obj);
1941 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1942 #ifdef HAVE_QUOTA_SUPPORT
1944 /* Trigger dqrel on the owner of child. If failed,
1945 * the next call for lquota_chkquota will process it */
1946 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1953 * Permission check is done when open,
1954 * no need check again.
1956 static int mdd_readpage_sanity_check(const struct lu_env *env,
1957 struct mdd_object *obj)
1959 struct dt_object *next = mdd_object_child(obj);
1963 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1971 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
1972 int first, void *area, int nob,
1973 const struct dt_it_ops *iops, struct dt_it *it,
1974 __u64 *start, __u64 *end,
1975 struct lu_dirent **last, __u32 attr)
1979 struct lu_dirent *ent;
1982 memset(area, 0, sizeof (struct lu_dirpage));
1983 area += sizeof (struct lu_dirpage);
1984 nob -= sizeof (struct lu_dirpage);
1992 len = iops->key_size(env, it);
1994 /* IAM iterator can return record with zero len. */
1998 hash = iops->store(env, it);
1999 if (unlikely(first)) {
2004 /* calculate max space required for lu_dirent */
2005 recsize = lu_dirent_calc_size(len, attr);
2007 if (nob >= recsize) {
2008 result = iops->rec(env, it, ent, attr);
2009 if (result == -ESTALE)
2014 /* osd might not able to pack all attributes,
2015 * so recheck rec length */
2016 recsize = le16_to_cpu(ent->lde_reclen);
2019 * record doesn't fit into page, enlarge previous one.
2022 (*last)->lde_reclen =
2023 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2032 ent = (void *)ent + recsize;
2036 result = iops->next(env, it);
2037 if (result == -ESTALE)
2039 } while (result == 0);
2046 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2047 const struct lu_rdpg *rdpg)
2050 struct dt_object *next = mdd_object_child(obj);
2051 const struct dt_it_ops *iops;
2053 struct lu_dirent *last = NULL;
2054 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2061 LASSERT(rdpg->rp_pages != NULL);
2062 LASSERT(next->do_index_ops != NULL);
2064 if (rdpg->rp_count <= 0)
2068 * iterate through directory and fill pages from @rdpg
2070 iops = &next->do_index_ops->dio_it;
2071 it = iops->init(env, next, mdd_object_capa(env, obj));
2075 rc = iops->load(env, it, rdpg->rp_hash);
2079 * Iterator didn't find record with exactly the key requested.
2081 * It is currently either
2083 * - positioned above record with key less than
2084 * requested---skip it.
2086 * - or not positioned at all (is in IAM_IT_SKEWED
2087 * state)---position it on the next item.
2089 rc = iops->next(env, it);
2094 * At this point and across for-loop:
2096 * rc == 0 -> ok, proceed.
2097 * rc > 0 -> end of directory.
2100 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2101 i++, nob -= CFS_PAGE_SIZE) {
2102 LASSERT(i < rdpg->rp_npages);
2103 pg = rdpg->rp_pages[i];
2104 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2105 min_t(int, nob, CFS_PAGE_SIZE), iops,
2106 it, &hash_start, &hash_end, &last,
2108 if (rc != 0 || i == rdpg->rp_npages - 1) {
2110 last->lde_reclen = 0;
2118 hash_end = DIR_END_OFF;
2122 struct lu_dirpage *dp;
2124 dp = cfs_kmap(rdpg->rp_pages[0]);
2125 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2126 dp->ldp_hash_end = cpu_to_le64(hash_end);
2129 * No pages were processed, mark this.
2131 dp->ldp_flags |= LDF_EMPTY;
2133 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2134 cfs_kunmap(rdpg->rp_pages[0]);
2137 iops->fini(env, it);
2142 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2143 const struct lu_rdpg *rdpg)
2145 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2149 LASSERT(mdd_object_exists(mdd_obj));
2151 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2152 rc = mdd_readpage_sanity_check(env, mdd_obj);
2154 GOTO(out_unlock, rc);
2156 if (mdd_is_dead_obj(mdd_obj)) {
2158 struct lu_dirpage *dp;
2161 * According to POSIX, please do not return any entry to client:
2162 * even dot and dotdot should not be returned.
2164 CWARN("readdir from dead object: "DFID"\n",
2165 PFID(mdd_object_fid(mdd_obj)));
2167 if (rdpg->rp_count <= 0)
2168 GOTO(out_unlock, rc = -EFAULT);
2169 LASSERT(rdpg->rp_pages != NULL);
2171 pg = rdpg->rp_pages[0];
2172 dp = (struct lu_dirpage*)cfs_kmap(pg);
2173 memset(dp, 0 , sizeof(struct lu_dirpage));
2174 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2175 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2176 dp->ldp_flags |= LDF_EMPTY;
2177 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2179 GOTO(out_unlock, rc = 0);
2182 rc = __mdd_readpage(env, mdd_obj, rdpg);
2186 mdd_read_unlock(env, mdd_obj);
2190 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2192 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2193 struct dt_object *next;
2195 LASSERT(mdd_object_exists(mdd_obj));
2196 next = mdd_object_child(mdd_obj);
2197 return next->do_ops->do_object_sync(env, next);
2200 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2201 struct md_object *obj)
2203 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2205 LASSERT(mdd_object_exists(mdd_obj));
2206 return do_version_get(env, mdd_object_child(mdd_obj));
2209 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2210 dt_obj_version_t version)
2212 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2214 LASSERT(mdd_object_exists(mdd_obj));
2215 return do_version_set(env, mdd_object_child(mdd_obj), version);
2218 const struct md_object_operations mdd_obj_ops = {
2219 .moo_permission = mdd_permission,
2220 .moo_attr_get = mdd_attr_get,
2221 .moo_attr_set = mdd_attr_set,
2222 .moo_xattr_get = mdd_xattr_get,
2223 .moo_xattr_set = mdd_xattr_set,
2224 .moo_xattr_list = mdd_xattr_list,
2225 .moo_xattr_del = mdd_xattr_del,
2226 .moo_object_create = mdd_object_create,
2227 .moo_ref_add = mdd_ref_add,
2228 .moo_ref_del = mdd_ref_del,
2229 .moo_open = mdd_open,
2230 .moo_close = mdd_close,
2231 .moo_readpage = mdd_readpage,
2232 .moo_readlink = mdd_readlink,
2233 .moo_capa_get = mdd_capa_get,
2234 .moo_object_sync = mdd_object_sync,
2235 .moo_version_get = mdd_version_get,
2236 .moo_version_set = mdd_version_set,
2237 .moo_path = mdd_path,