1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <linux/jbd.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
54 /* fid_be_cpu(), fid_cpu_to_be(). */
55 #include <lustre_fid.h>
57 #include <lustre_param.h>
58 #include <linux/ldiskfs_fs.h>
59 #include <lustre_mds.h>
60 #include <lustre/lustre_idl.h>
62 #include "mdd_internal.h"
64 static const struct lu_object_operations mdd_lu_obj_ops;
66 static int mdd_xattr_get(const struct lu_env *env,
67 struct md_object *obj, struct lu_buf *buf,
70 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
71 struct lu_attr *la, struct lustre_capa *capa)
73 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
74 PFID(mdd_object_fid(obj)));
75 return mdo_attr_get(env, obj, la, capa);
78 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
80 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
82 if (flags & LUSTRE_APPEND_FL)
83 obj->mod_flags |= APPEND_OBJ;
85 if (flags & LUSTRE_IMMUTABLE_FL)
86 obj->mod_flags |= IMMUTE_OBJ;
89 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
91 struct mdd_thread_info *info;
93 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
94 LASSERT(info != NULL);
98 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
102 buf = &mdd_env_info(env)->mti_buf;
108 void mdd_buf_put(struct lu_buf *buf)
110 if (buf == NULL || buf->lb_buf == NULL)
113 OBD_VFREE(buf->lb_buf, buf->lb_len);
115 OBD_FREE(buf->lb_buf, buf->lb_len);
119 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
120 const void *area, ssize_t len)
124 buf = &mdd_env_info(env)->mti_buf;
125 buf->lb_buf = (void *)area;
130 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
131 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
133 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
135 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
137 OBD_VFREE(buf->lb_buf, buf->lb_len);
139 OBD_FREE(buf->lb_buf, buf->lb_len);
142 if (buf->lb_buf == NULL) {
144 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
145 OBD_ALLOC(buf->lb_buf, buf->lb_len);
148 if (buf->lb_buf == NULL) {
149 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
152 if (buf->lb_buf == NULL)
158 /** Increase the size of the \a mti_big_buf.
159 * preserves old data in buffer
160 * old buffer remains unchanged on error
161 * \retval 0 or -ENOMEM
163 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
165 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
168 LASSERT(len >= oldbuf->lb_len);
169 if (len > BUF_VMALLOC_SIZE) {
170 OBD_VMALLOC(buf.lb_buf, len);
173 OBD_ALLOC(buf.lb_buf, len);
176 if (buf.lb_buf == NULL)
180 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
182 if (oldbuf->lb_vmalloc)
183 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
185 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
187 memcpy(oldbuf, &buf, sizeof(buf));
192 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
193 struct mdd_device *mdd)
195 struct mdd_thread_info *mti = mdd_env_info(env);
198 max_cookie_size = mdd_lov_cookiesize(env, mdd);
199 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
200 if (mti->mti_max_cookie)
201 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
202 mti->mti_max_cookie = NULL;
203 mti->mti_max_cookie_size = 0;
205 if (unlikely(mti->mti_max_cookie == NULL)) {
206 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
207 if (likely(mti->mti_max_cookie != NULL))
208 mti->mti_max_cookie_size = max_cookie_size;
210 if (likely(mti->mti_max_cookie != NULL))
211 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212 return mti->mti_max_cookie;
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216 struct mdd_device *mdd)
218 struct mdd_thread_info *mti = mdd_env_info(env);
221 max_lmm_size = mdd_lov_mdsize(env, mdd);
222 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223 if (mti->mti_max_lmm)
224 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225 mti->mti_max_lmm = NULL;
226 mti->mti_max_lmm_size = 0;
228 if (unlikely(mti->mti_max_lmm == NULL)) {
229 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
230 if (unlikely(mti->mti_max_lmm != NULL))
231 mti->mti_max_lmm_size = max_lmm_size;
233 return mti->mti_max_lmm;
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237 const struct lu_object_header *hdr,
240 struct mdd_object *mdd_obj;
242 OBD_ALLOC_PTR(mdd_obj);
243 if (mdd_obj != NULL) {
246 o = mdd2lu_obj(mdd_obj);
247 lu_object_init(o, NULL, d);
248 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250 mdd_obj->mod_count = 0;
251 o->lo_ops = &mdd_lu_obj_ops;
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259 const struct lu_object_conf *_)
261 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262 struct mdd_object *mdd_obj = lu2mdd_obj(o);
263 struct lu_object *below;
264 struct lu_device *under;
267 mdd_obj->mod_cltime = 0;
268 under = &d->mdd_child->dd_lu_dev;
269 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270 mdd_pdlock_init(mdd_obj);
274 lu_object_add(o, below);
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
281 if (lu_object_exists(o))
282 return mdd_get_flags(env, lu2mdd_obj(o));
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
289 struct mdd_object *mdd = lu2mdd_obj(o);
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296 lu_printer_t p, const struct lu_object *o)
298 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
301 static const struct lu_object_operations mdd_lu_obj_ops = {
302 .loo_object_init = mdd_object_init,
303 .loo_object_start = mdd_object_start,
304 .loo_object_free = mdd_object_free,
305 .loo_object_print = mdd_object_print,
308 struct mdd_object *mdd_object_find(const struct lu_env *env,
309 struct mdd_device *d,
310 const struct lu_fid *f)
312 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
315 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
316 const char *path, struct lu_fid *fid)
319 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
320 struct mdd_object *obj;
321 struct lu_name *lname = &mdd_env_info(env)->mti_name;
326 /* temp buffer for path element */
327 buf = mdd_buf_alloc(env, PATH_MAX);
328 if (buf->lb_buf == NULL)
331 lname->ln_name = name = buf->lb_buf;
332 lname->ln_namelen = 0;
333 *f = mdd->mdd_root_fid;
340 while (*path != '/' && *path != '\0') {
348 /* find obj corresponding to fid */
349 obj = mdd_object_find(env, mdd, f);
351 GOTO(out, rc = -EREMOTE);
353 GOTO(out, rc = -PTR_ERR(obj));
354 /* get child fid from parent and name */
355 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
356 mdd_object_put(env, obj);
361 lname->ln_namelen = 0;
370 /** The maximum depth that fid2path() will search.
371 * This is limited only because we want to store the fids for
372 * historical path lookup purposes.
374 #define MAX_PATH_DEPTH 100
376 /** mdd_path() lookup structure. */
377 struct path_lookup_info {
378 __u64 pli_recno; /**< history point */
379 struct lu_fid pli_fid;
380 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
381 struct mdd_object *pli_mdd_obj;
382 char *pli_path; /**< full path */
384 int pli_linkno; /**< which hardlink to follow */
385 int pli_fidcount; /**< number of \a pli_fids */
388 static int mdd_path_current(const struct lu_env *env,
389 struct path_lookup_info *pli)
391 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
392 struct mdd_object *mdd_obj;
393 struct lu_buf *buf = NULL;
394 struct link_ea_header *leh;
395 struct link_ea_entry *lee;
396 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
397 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
403 ptr = pli->pli_path + pli->pli_pathlen - 1;
406 pli->pli_fidcount = 0;
407 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
409 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
410 mdd_obj = mdd_object_find(env, mdd,
411 &pli->pli_fids[pli->pli_fidcount]);
413 GOTO(out, rc = -EREMOTE);
415 GOTO(out, rc = -PTR_ERR(mdd_obj));
416 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
418 mdd_object_put(env, mdd_obj);
422 /* Do I need to error out here? */
427 /* Get parent fid and object name */
428 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
429 buf = mdd_links_get(env, mdd_obj);
430 mdd_read_unlock(env, mdd_obj);
431 mdd_object_put(env, mdd_obj);
433 GOTO(out, rc = PTR_ERR(buf));
436 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
437 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
439 /* If set, use link #linkno for path lookup, otherwise use
440 link #0. Only do this for the final path element. */
441 if ((pli->pli_fidcount == 0) &&
442 (pli->pli_linkno < leh->leh_reccount)) {
444 for (count = 0; count < pli->pli_linkno; count++) {
445 lee = (struct link_ea_entry *)
446 ((char *)lee + reclen);
447 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
449 if (pli->pli_linkno < leh->leh_reccount - 1)
450 /* indicate to user there are more links */
454 /* Pack the name in the end of the buffer */
455 ptr -= tmpname->ln_namelen;
456 if (ptr - 1 <= pli->pli_path)
457 GOTO(out, rc = -EOVERFLOW);
458 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
461 /* Store the parent fid for historic lookup */
462 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
463 GOTO(out, rc = -EOVERFLOW);
464 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
467 /* Verify that our path hasn't changed since we started the lookup */
468 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
470 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
471 GOTO (out, rc = -EAGAIN);
473 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
474 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
475 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
476 PFID(&pli->pli_fid));
477 GOTO(out, rc = -EAGAIN);
480 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
484 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
485 /* if we vmalloced a large buffer drop it */
491 /* Returns the full path to this fid, as of changelog record recno. */
492 static int mdd_path(const struct lu_env *env, struct md_object *obj,
493 char *path, int pathlen, __u64 recno, int *linkno)
495 struct path_lookup_info *pli;
503 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
513 pli->pli_mdd_obj = md2mdd_obj(obj);
514 pli->pli_recno = recno;
515 pli->pli_path = path;
516 pli->pli_pathlen = pathlen;
517 pli->pli_linkno = *linkno;
519 /* Retry multiple times in case file is being moved */
520 while (tries-- && rc == -EAGAIN)
521 rc = mdd_path_current(env, pli);
523 #if 0 /* We need old path names only for replication */
524 /* For historical path lookup, the current links may not have existed
525 * at "recno" time. We must switch over to earlier links/parents
526 * by using the changelog records. If the earlier parent doesn't
527 * exist, we must search back through the changelog to reconstruct
528 * its parents, then check if it exists, etc.
529 * We may ignore this problem for the initial implementation and
530 * state that an "original" hardlink must still exist for us to find
531 * historic path name. */
532 if (pli->pli_recno != -1)
533 rc = mdd_path_historic(env, pli);
536 /* return next link index to caller */
537 *linkno = pli->pli_linkno;
544 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
546 struct lu_attr *la = &mdd_env_info(env)->mti_la;
550 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
552 mdd_flags_xlate(obj, la->la_flags);
553 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
554 obj->mod_flags |= MNLINK_OBJ;
559 /* get only inode attributes */
560 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
566 if (ma->ma_valid & MA_INODE)
569 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
570 mdd_object_capa(env, mdd_obj));
572 ma->ma_valid |= MA_INODE;
576 static int mdd_get_default_md(struct mdd_object *mdd_obj,
577 struct lov_mds_md *lmm, int *size)
579 struct lov_desc *ldesc;
580 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
583 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
584 LASSERT(ldesc != NULL);
589 lmm->lmm_magic = LOV_MAGIC_V1;
590 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
591 lmm->lmm_pattern = ldesc->ld_pattern;
592 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
593 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
594 *size = sizeof(struct lov_mds_md);
596 RETURN(sizeof(struct lov_mds_md));
599 /* get lov EA only */
600 static int __mdd_lmm_get(const struct lu_env *env,
601 struct mdd_object *mdd_obj, struct md_attr *ma)
606 if (ma->ma_valid & MA_LOV)
609 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
612 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
613 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
618 ma->ma_valid |= MA_LOV;
624 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
630 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
631 rc = __mdd_lmm_get(env, mdd_obj, ma);
632 mdd_read_unlock(env, mdd_obj);
637 static int __mdd_lmv_get(const struct lu_env *env,
638 struct mdd_object *mdd_obj, struct md_attr *ma)
643 if (ma->ma_valid & MA_LMV)
646 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
649 ma->ma_valid |= MA_LMV;
655 static int mdd_attr_get_internal(const struct lu_env *env,
656 struct mdd_object *mdd_obj,
662 if (ma->ma_need & MA_INODE)
663 rc = mdd_iattr_get(env, mdd_obj, ma);
665 if (rc == 0 && ma->ma_need & MA_LOV) {
666 if (S_ISREG(mdd_object_type(mdd_obj)) ||
667 S_ISDIR(mdd_object_type(mdd_obj)))
668 rc = __mdd_lmm_get(env, mdd_obj, ma);
670 if (rc == 0 && ma->ma_need & MA_LMV) {
671 if (S_ISDIR(mdd_object_type(mdd_obj)))
672 rc = __mdd_lmv_get(env, mdd_obj, ma);
674 #ifdef CONFIG_FS_POSIX_ACL
675 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
676 if (S_ISDIR(mdd_object_type(mdd_obj)))
677 rc = mdd_def_acl_get(env, mdd_obj, ma);
680 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
685 int mdd_attr_get_internal_locked(const struct lu_env *env,
686 struct mdd_object *mdd_obj, struct md_attr *ma)
689 int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
692 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
693 rc = mdd_attr_get_internal(env, mdd_obj, ma);
695 mdd_read_unlock(env, mdd_obj);
700 * No permission check is needed.
702 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
705 struct mdd_object *mdd_obj = md2mdd_obj(obj);
709 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
714 * No permission check is needed.
716 static int mdd_xattr_get(const struct lu_env *env,
717 struct md_object *obj, struct lu_buf *buf,
720 struct mdd_object *mdd_obj = md2mdd_obj(obj);
725 LASSERT(mdd_object_exists(mdd_obj));
727 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
728 rc = mdo_xattr_get(env, mdd_obj, buf, name,
729 mdd_object_capa(env, mdd_obj));
730 mdd_read_unlock(env, mdd_obj);
736 * Permission check is done when open,
737 * no need check again.
739 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
742 struct mdd_object *mdd_obj = md2mdd_obj(obj);
743 struct dt_object *next;
748 LASSERT(mdd_object_exists(mdd_obj));
750 next = mdd_object_child(mdd_obj);
751 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
752 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
753 mdd_object_capa(env, mdd_obj));
754 mdd_read_unlock(env, mdd_obj);
759 * No permission check is needed.
761 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
764 struct mdd_object *mdd_obj = md2mdd_obj(obj);
769 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
770 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
771 mdd_read_unlock(env, mdd_obj);
776 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
777 struct mdd_object *c, struct md_attr *ma,
778 struct thandle *handle,
779 const struct md_op_spec *spec)
781 struct lu_attr *attr = &ma->ma_attr;
782 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
783 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
784 const struct dt_index_features *feat = spec->sp_feat;
788 if (!mdd_object_exists(c)) {
789 struct dt_object *next = mdd_object_child(c);
792 if (feat != &dt_directory_features && feat != NULL)
793 dof->dof_type = DFT_INDEX;
795 dof->dof_type = dt_mode_to_dft(attr->la_mode);
797 dof->u.dof_idx.di_feat = feat;
799 /* @hint will be initialized by underlying device. */
800 next->do_ops->do_ah_init(env, hint,
801 p ? mdd_object_child(p) : NULL,
802 attr->la_mode & S_IFMT);
804 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
805 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
813 * Make sure the ctime is increased only.
815 static inline int mdd_attr_check(const struct lu_env *env,
816 struct mdd_object *obj,
817 struct lu_attr *attr)
819 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
823 if (attr->la_valid & LA_CTIME) {
824 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
828 if (attr->la_ctime < tmp_la->la_ctime)
829 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
830 else if (attr->la_valid == LA_CTIME &&
831 attr->la_ctime == tmp_la->la_ctime)
832 attr->la_valid &= ~LA_CTIME;
837 int mdd_attr_set_internal(const struct lu_env *env,
838 struct mdd_object *obj,
839 struct lu_attr *attr,
840 struct thandle *handle,
846 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
847 #ifdef CONFIG_FS_POSIX_ACL
848 if (!rc && (attr->la_valid & LA_MODE) && needacl)
849 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
854 int mdd_attr_check_set_internal(const struct lu_env *env,
855 struct mdd_object *obj,
856 struct lu_attr *attr,
857 struct thandle *handle,
863 rc = mdd_attr_check(env, obj, attr);
868 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
872 static int mdd_attr_set_internal_locked(const struct lu_env *env,
873 struct mdd_object *obj,
874 struct lu_attr *attr,
875 struct thandle *handle,
881 needacl = needacl && (attr->la_valid & LA_MODE);
883 mdd_write_lock(env, obj, MOR_TGT_CHILD);
884 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
886 mdd_write_unlock(env, obj);
890 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
891 struct mdd_object *obj,
892 struct lu_attr *attr,
893 struct thandle *handle,
899 needacl = needacl && (attr->la_valid & LA_MODE);
901 mdd_write_lock(env, obj, MOR_TGT_CHILD);
902 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
904 mdd_write_unlock(env, obj);
908 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
909 const struct lu_buf *buf, const char *name,
910 int fl, struct thandle *handle)
912 struct lustre_capa *capa = mdd_object_capa(env, obj);
916 if (buf->lb_buf && buf->lb_len > 0)
917 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
918 else if (buf->lb_buf == NULL && buf->lb_len == 0)
919 rc = mdo_xattr_del(env, obj, name, handle, capa);
925 * This gives the same functionality as the code between
926 * sys_chmod and inode_setattr
927 * chown_common and inode_setattr
928 * utimes and inode_setattr
929 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
931 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
932 struct lu_attr *la, const struct md_attr *ma)
934 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
935 struct md_ucred *uc = md_ucred(env);
942 /* Do not permit change file type */
943 if (la->la_valid & LA_TYPE)
946 /* They should not be processed by setattr */
947 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
950 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
954 if (la->la_valid == LA_CTIME) {
955 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
956 /* This is only for set ctime when rename's source is
958 rc = mdd_may_delete(env, NULL, obj,
959 (struct md_attr *)ma, 1, 0);
960 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
961 la->la_valid &= ~LA_CTIME;
965 if (la->la_valid == LA_ATIME) {
966 /* This is atime only set for read atime update on close. */
967 if (la->la_atime <= tmp_la->la_atime +
968 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
969 la->la_valid &= ~LA_ATIME;
973 /* Check if flags change. */
974 if (la->la_valid & LA_FLAGS) {
975 unsigned int oldflags = 0;
976 unsigned int newflags = la->la_flags &
977 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
979 if ((uc->mu_fsuid != tmp_la->la_uid) &&
980 !mdd_capable(uc, CFS_CAP_FOWNER))
983 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
984 * only be changed by the relevant capability. */
985 if (mdd_is_immutable(obj))
986 oldflags |= LUSTRE_IMMUTABLE_FL;
987 if (mdd_is_append(obj))
988 oldflags |= LUSTRE_APPEND_FL;
989 if ((oldflags ^ newflags) &&
990 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
993 if (!S_ISDIR(tmp_la->la_mode))
994 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
997 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
998 (la->la_valid & ~LA_FLAGS) &&
999 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1002 /* Check for setting the obj time. */
1003 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1004 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1005 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1006 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1007 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1015 /* Make sure a caller can chmod. */
1016 if (la->la_valid & LA_MODE) {
1017 /* Bypass la_vaild == LA_MODE,
1018 * this is for changing file with SUID or SGID. */
1019 if ((la->la_valid & ~LA_MODE) &&
1020 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1021 (uc->mu_fsuid != tmp_la->la_uid) &&
1022 !mdd_capable(uc, CFS_CAP_FOWNER))
1025 if (la->la_mode == (umode_t) -1)
1026 la->la_mode = tmp_la->la_mode;
1028 la->la_mode = (la->la_mode & S_IALLUGO) |
1029 (tmp_la->la_mode & ~S_IALLUGO);
1031 /* Also check the setgid bit! */
1032 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1033 la->la_gid : tmp_la->la_gid) &&
1034 !mdd_capable(uc, CFS_CAP_FSETID))
1035 la->la_mode &= ~S_ISGID;
1037 la->la_mode = tmp_la->la_mode;
1040 /* Make sure a caller can chown. */
1041 if (la->la_valid & LA_UID) {
1042 if (la->la_uid == (uid_t) -1)
1043 la->la_uid = tmp_la->la_uid;
1044 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1045 (la->la_uid != tmp_la->la_uid)) &&
1046 !mdd_capable(uc, CFS_CAP_CHOWN))
1049 /* If the user or group of a non-directory has been
1050 * changed by a non-root user, remove the setuid bit.
1051 * 19981026 David C Niemi <niemi@tux.org>
1053 * Changed this to apply to all users, including root,
1054 * to avoid some races. This is the behavior we had in
1055 * 2.0. The check for non-root was definitely wrong
1056 * for 2.2 anyway, as it should have been using
1057 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1058 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1059 !S_ISDIR(tmp_la->la_mode)) {
1060 la->la_mode &= ~S_ISUID;
1061 la->la_valid |= LA_MODE;
1065 /* Make sure caller can chgrp. */
1066 if (la->la_valid & LA_GID) {
1067 if (la->la_gid == (gid_t) -1)
1068 la->la_gid = tmp_la->la_gid;
1069 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1070 ((la->la_gid != tmp_la->la_gid) &&
1071 !lustre_in_group_p(uc, la->la_gid))) &&
1072 !mdd_capable(uc, CFS_CAP_CHOWN))
1075 /* Likewise, if the user or group of a non-directory
1076 * has been changed by a non-root user, remove the
1077 * setgid bit UNLESS there is no group execute bit
1078 * (this would be a file marked for mandatory
1079 * locking). 19981026 David C Niemi <niemi@tux.org>
1081 * Removed the fsuid check (see the comment above) --
1083 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1084 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1085 la->la_mode &= ~S_ISGID;
1086 la->la_valid |= LA_MODE;
1090 /* For both Size-on-MDS case and truncate case,
1091 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1092 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1093 * For SOM case, it is true, the MAY_WRITE perm has been checked
1094 * when open, no need check again. For truncate case, it is false,
1095 * the MAY_WRITE perm should be checked here. */
1096 if (ma->ma_attr_flags & MDS_SOM) {
1097 /* For the "Size-on-MDS" setattr update, merge coming
1098 * attributes with the set in the inode. BUG 10641 */
1099 if ((la->la_valid & LA_ATIME) &&
1100 (la->la_atime <= tmp_la->la_atime))
1101 la->la_valid &= ~LA_ATIME;
1103 /* OST attributes do not have a priority over MDS attributes,
1104 * so drop times if ctime is equal. */
1105 if ((la->la_valid & LA_CTIME) &&
1106 (la->la_ctime <= tmp_la->la_ctime))
1107 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1109 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1110 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1111 (uc->mu_fsuid == tmp_la->la_uid)) &&
1112 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1113 rc = mdd_permission_internal_locked(env, obj,
1120 if (la->la_valid & LA_CTIME) {
1121 /* The pure setattr, it has the priority over what is
1122 * already set, do not drop it if ctime is equal. */
1123 if (la->la_ctime < tmp_la->la_ctime)
1124 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1132 /** Store a data change changelog record
1133 * If this fails, we must fail the whole transaction; we don't
1134 * want the change to commit without the log entry.
1135 * \param mdd_obj - mdd_object of change
1136 * \param handle - transacion handle
1138 static int mdd_changelog_data_store(const struct lu_env *env,
1139 struct mdd_device *mdd,
1140 enum changelog_rec_type type,
1141 struct mdd_object *mdd_obj,
1142 struct thandle *handle)
1144 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1145 struct llog_changelog_rec *rec;
1150 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1153 LASSERT(handle != NULL);
1154 LASSERT(mdd_obj != NULL);
1156 if ((type == CL_SETATTR) &&
1157 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1158 /* Don't need multiple updates in this log */
1159 /* Don't check under lock - no big deal if we get an extra
1164 reclen = llog_data_len(sizeof(*rec));
1165 buf = mdd_buf_alloc(env, reclen);
1166 if (buf->lb_buf == NULL)
1168 rec = (struct llog_changelog_rec *)buf->lb_buf;
1170 rec->cr_flags = CLF_VERSION;
1171 rec->cr_type = (__u32)type;
1172 rec->cr_tfid = *tfid;
1173 rec->cr_namelen = 0;
1174 mdd_obj->mod_cltime = cfs_time_current_64();
1176 rc = mdd_changelog_llog_write(mdd, rec, handle);
1178 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1179 rc, type, PFID(tfid));
1186 /* set attr and LOV EA at once, return updated attr */
1187 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1188 const struct md_attr *ma)
1190 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1191 struct mdd_device *mdd = mdo2mdd(obj);
1192 struct thandle *handle;
1193 struct lov_mds_md *lmm = NULL;
1194 struct llog_cookie *logcookies = NULL;
1195 int rc, lmm_size = 0, cookie_size = 0;
1196 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1197 #ifdef HAVE_QUOTA_SUPPORT
1198 struct obd_device *obd = mdd->mdd_obd_dev;
1199 struct mds_obd *mds = &obd->u.mds;
1200 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1201 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1202 int quota_opc = 0, block_count = 0;
1203 int inode_pending = 0, block_pending = 0;
1207 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1208 MDD_TXN_ATTR_SET_OP);
1209 handle = mdd_trans_start(env, mdd);
1211 RETURN(PTR_ERR(handle));
1212 /*TODO: add lock here*/
1213 /* start a log jounal handle if needed */
1214 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1215 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1216 lmm_size = mdd_lov_mdsize(env, mdd);
1217 lmm = mdd_max_lmm_get(env, mdd);
1219 GOTO(cleanup, rc = -ENOMEM);
1221 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1228 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1229 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1230 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1232 *la_copy = ma->ma_attr;
1233 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1237 #ifdef HAVE_QUOTA_SUPPORT
1238 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1239 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1241 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1243 quota_opc = FSFILT_OP_SETATTR;
1244 mdd_quota_wrapper(la_copy, qnids);
1245 mdd_quota_wrapper(la_tmp, qoids);
1246 /* get file quota for new owner */
1247 lquota_chkquota(mds_quota_interface_ref, obd,
1248 qnids[USRQUOTA], qnids[GRPQUOTA], 1,
1249 &inode_pending, NULL, 0, NULL, 0);
1250 block_count = (la_tmp->la_blocks + 7) >> 3;
1252 /* get block quota for new owner */
1253 lquota_chkquota(mds_quota_interface_ref, obd,
1256 block_count, &block_pending,
1257 NULL, LQUOTA_FLAGS_BLK,
1263 if (la_copy->la_valid & LA_FLAGS) {
1264 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1267 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1268 } else if (la_copy->la_valid) { /* setattr */
1269 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1271 /* journal chown/chgrp in llog, just like unlink */
1272 if (rc == 0 && lmm_size){
1273 cookie_size = mdd_lov_cookiesize(env, mdd);
1274 logcookies = mdd_max_cookie_get(env, mdd);
1275 if (logcookies == NULL)
1276 GOTO(cleanup, rc = -ENOMEM);
1278 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1279 logcookies, cookie_size) <= 0)
1284 if (rc == 0 && ma->ma_valid & MA_LOV) {
1287 mode = mdd_object_type(mdd_obj);
1288 if (S_ISREG(mode) || S_ISDIR(mode)) {
1289 rc = mdd_lsm_sanity_check(env, mdd_obj);
1293 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1294 ma->ma_lmm_size, handle, 1);
1299 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1300 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1302 mdd_trans_stop(env, mdd, rc, handle);
1303 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1304 /*set obd attr, if needed*/
1305 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1308 #ifdef HAVE_QUOTA_SUPPORT
1311 lquota_pending_commit(mds_quota_interface_ref, obd,
1312 qnids[USRQUOTA], qnids[GRPQUOTA],
1315 lquota_pending_commit(mds_quota_interface_ref, obd,
1316 qnids[USRQUOTA], qnids[GRPQUOTA],
1318 /* Trigger dqrel/dqacq for original owner and new owner.
1319 * If failed, the next call for lquota_chkquota will
1321 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1328 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1329 const struct lu_buf *buf, const char *name, int fl,
1330 struct thandle *handle)
1335 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1336 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1337 mdd_write_unlock(env, obj);
1342 static int mdd_xattr_sanity_check(const struct lu_env *env,
1343 struct mdd_object *obj)
1345 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1346 struct md_ucred *uc = md_ucred(env);
1350 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1353 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1357 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1358 !mdd_capable(uc, CFS_CAP_FOWNER))
1365 * The caller should guarantee to update the object ctime
1366 * after xattr_set if needed.
1368 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1369 const struct lu_buf *buf, const char *name,
1372 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1373 struct mdd_device *mdd = mdo2mdd(obj);
1374 struct thandle *handle;
1378 rc = mdd_xattr_sanity_check(env, mdd_obj);
1382 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1383 handle = mdd_trans_start(env, mdd);
1385 RETURN(PTR_ERR(handle));
1387 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1389 /* Only record user xattr changes */
1390 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1391 (strncmp("user.", name, 5) == 0))
1392 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1394 mdd_trans_stop(env, mdd, rc, handle);
1400 * The caller should guarantee to update the object ctime
1401 * after xattr_set if needed.
1403 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1406 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1407 struct mdd_device *mdd = mdo2mdd(obj);
1408 struct thandle *handle;
1412 rc = mdd_xattr_sanity_check(env, mdd_obj);
1416 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1417 handle = mdd_trans_start(env, mdd);
1419 RETURN(PTR_ERR(handle));
1421 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1422 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1423 mdd_object_capa(env, mdd_obj));
1424 mdd_write_unlock(env, mdd_obj);
1426 /* Only record user xattr changes */
1427 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1428 (strncmp("user.", name, 5) != 0))
1429 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1432 mdd_trans_stop(env, mdd, rc, handle);
1437 /* partial unlink */
1438 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1441 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1442 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1443 struct mdd_device *mdd = mdo2mdd(obj);
1444 struct thandle *handle;
1445 #ifdef HAVE_QUOTA_SUPPORT
1446 struct obd_device *obd = mdd->mdd_obd_dev;
1447 struct mds_obd *mds = &obd->u.mds;
1448 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1455 * Check -ENOENT early here because we need to get object type
1456 * to calculate credits before transaction start
1458 if (!mdd_object_exists(mdd_obj))
1461 LASSERT(mdd_object_exists(mdd_obj) > 0);
1463 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1467 handle = mdd_trans_start(env, mdd);
1471 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1473 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1477 __mdd_ref_del(env, mdd_obj, handle, 0);
1479 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1481 __mdd_ref_del(env, mdd_obj, handle, 1);
1484 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1485 la_copy->la_ctime = ma->ma_attr.la_ctime;
1487 la_copy->la_valid = LA_CTIME;
1488 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1492 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1493 #ifdef HAVE_QUOTA_SUPPORT
1494 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1495 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1496 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1497 mdd_quota_wrapper(&ma->ma_attr, qids);
1504 mdd_write_unlock(env, mdd_obj);
1505 mdd_trans_stop(env, mdd, rc, handle);
1506 #ifdef HAVE_QUOTA_SUPPORT
1508 /* Trigger dqrel on the owner of child. If failed,
1509 * the next call for lquota_chkquota will process it */
1510 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1516 /* partial operation */
1517 static int mdd_oc_sanity_check(const struct lu_env *env,
1518 struct mdd_object *obj,
1524 switch (ma->ma_attr.la_mode & S_IFMT) {
1541 static int mdd_object_create(const struct lu_env *env,
1542 struct md_object *obj,
1543 const struct md_op_spec *spec,
1547 struct mdd_device *mdd = mdo2mdd(obj);
1548 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1549 const struct lu_fid *pfid = spec->u.sp_pfid;
1550 struct thandle *handle;
1551 #ifdef HAVE_QUOTA_SUPPORT
1552 struct obd_device *obd = mdd->mdd_obd_dev;
1553 struct mds_obd *mds = &obd->u.mds;
1554 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1555 int quota_opc = 0, block_count = 0;
1556 int inode_pending = 0, block_pending = 0;
1561 #ifdef HAVE_QUOTA_SUPPORT
1562 if (mds->mds_quota) {
1563 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1564 mdd_quota_wrapper(&ma->ma_attr, qids);
1565 /* get file quota for child */
1566 lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
1567 qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
1569 switch (ma->ma_attr.la_mode & S_IFMT) {
1578 /* get block quota for child */
1580 lquota_chkquota(mds_quota_interface_ref, obd,
1581 qids[USRQUOTA], qids[GRPQUOTA],
1582 block_count, &block_pending, NULL,
1583 LQUOTA_FLAGS_BLK, NULL, 0);
1587 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1588 handle = mdd_trans_start(env, mdd);
1590 GOTO(out_pending, rc = PTR_ERR(handle));
1592 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1593 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1597 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1601 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1602 /* If creating the slave object, set slave EA here. */
1603 int lmv_size = spec->u.sp_ea.eadatalen;
1604 struct lmv_stripe_md *lmv;
1606 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1607 LASSERT(lmv != NULL && lmv_size > 0);
1609 rc = __mdd_xattr_set(env, mdd_obj,
1610 mdd_buf_get_const(env, lmv, lmv_size),
1611 XATTR_NAME_LMV, 0, handle);
1615 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1618 #ifdef CONFIG_FS_POSIX_ACL
1619 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1620 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1622 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1623 buf->lb_len = spec->u.sp_ea.eadatalen;
1624 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1625 rc = __mdd_acl_init(env, mdd_obj, buf,
1626 &ma->ma_attr.la_mode,
1631 ma->ma_attr.la_valid |= LA_MODE;
1634 pfid = spec->u.sp_ea.fid;
1637 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1643 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1644 mdd_write_unlock(env, mdd_obj);
1646 mdd_trans_stop(env, mdd, rc, handle);
1648 #ifdef HAVE_QUOTA_SUPPORT
1651 lquota_pending_commit(mds_quota_interface_ref, obd,
1652 qids[USRQUOTA], qids[GRPQUOTA],
1655 lquota_pending_commit(mds_quota_interface_ref, obd,
1656 qids[USRQUOTA], qids[GRPQUOTA],
1658 /* Trigger dqacq on the owner of child. If failed,
1659 * the next call for lquota_chkquota will process it. */
1660 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1661 FSFILT_OP_CREATE_PARTIAL_CHILD);
1668 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1669 const struct md_attr *ma)
1671 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1672 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1673 struct mdd_device *mdd = mdo2mdd(obj);
1674 struct thandle *handle;
1678 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1679 handle = mdd_trans_start(env, mdd);
1683 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1684 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1686 __mdd_ref_add(env, mdd_obj, handle);
1687 mdd_write_unlock(env, mdd_obj);
1689 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1690 la_copy->la_ctime = ma->ma_attr.la_ctime;
1692 la_copy->la_valid = LA_CTIME;
1693 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1696 mdd_trans_stop(env, mdd, 0, handle);
1702 * do NOT or the MAY_*'s, you'll get the weakest
1704 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1708 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1709 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1710 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1711 * owner can write to a file even if it is marked readonly to hide
1712 * its brokenness. (bug 5781) */
1713 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1714 struct md_ucred *uc = md_ucred(env);
1716 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1717 (la->la_uid == uc->mu_fsuid))
1721 if (flags & FMODE_READ)
1723 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1725 if (flags & MDS_FMODE_EXEC)
1730 static int mdd_open_sanity_check(const struct lu_env *env,
1731 struct mdd_object *obj, int flag)
1733 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1738 if (mdd_is_dead_obj(obj))
1741 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1745 if (S_ISLNK(tmp_la->la_mode))
1748 mode = accmode(env, tmp_la, flag);
1750 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1753 if (!(flag & MDS_OPEN_CREATED)) {
1754 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1759 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1760 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1761 flag &= ~MDS_OPEN_TRUNC;
1763 /* For writing append-only file must open it with append mode. */
1764 if (mdd_is_append(obj)) {
1765 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1767 if (flag & MDS_OPEN_TRUNC)
1773 * Now, flag -- O_NOATIME does not be packed by client.
1775 if (flag & O_NOATIME) {
1776 struct md_ucred *uc = md_ucred(env);
1778 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1779 (uc->mu_valid == UCRED_NEW)) &&
1780 (uc->mu_fsuid != tmp_la->la_uid) &&
1781 !mdd_capable(uc, CFS_CAP_FOWNER))
1789 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1792 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1795 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1797 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1799 mdd_obj->mod_count++;
1801 mdd_write_unlock(env, mdd_obj);
1805 /* return md_attr back,
1806 * if it is last unlink then return lov ea + llog cookie*/
1807 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1813 if (S_ISREG(mdd_object_type(obj))) {
1814 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1815 * Caller must be ready for that. */
1817 rc = __mdd_lmm_get(env, obj, ma);
1818 if ((ma->ma_valid & MA_LOV))
1819 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1826 * No permission check is needed.
1828 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1831 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1832 struct thandle *handle;
1836 #ifdef HAVE_QUOTA_SUPPORT
1837 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1838 struct mds_obd *mds = &obd->u.mds;
1839 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1844 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1847 handle = mdd_trans_start(env, mdo2mdd(obj));
1849 RETURN(PTR_ERR(handle));
1851 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1852 /* release open count */
1853 mdd_obj->mod_count --;
1855 if (mdd_obj->mod_count == 0) {
1856 /* remove link to object from orphan index */
1857 if (mdd_obj->mod_flags & ORPHAN_OBJ)
1858 __mdd_orphan_del(env, mdd_obj, handle);
1861 rc = mdd_iattr_get(env, mdd_obj, ma);
1863 if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
1864 rc = mdd_object_kill(env, mdd_obj, ma);
1865 #ifdef HAVE_QUOTA_SUPPORT
1866 if (mds->mds_quota) {
1867 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1868 mdd_quota_wrapper(&ma->ma_attr, qids);
1877 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1879 mdd_write_unlock(env, mdd_obj);
1880 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1881 #ifdef HAVE_QUOTA_SUPPORT
1883 /* Trigger dqrel on the owner of child. If failed,
1884 * the next call for lquota_chkquota will process it */
1885 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1892 * Permission check is done when open,
1893 * no need check again.
1895 static int mdd_readpage_sanity_check(const struct lu_env *env,
1896 struct mdd_object *obj)
1898 struct dt_object *next = mdd_object_child(obj);
1902 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1910 static int mdd_dir_page_build(const struct lu_env *env, int first,
1911 void *area, int nob, const struct dt_it_ops *iops,
1912 struct dt_it *it, __u64 *start, __u64 *end,
1913 struct lu_dirent **last)
1915 struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
1916 struct mdd_thread_info *info = mdd_env_info(env);
1917 struct lu_fid_pack *pack = &info->mti_pack;
1919 struct lu_dirent *ent;
1922 memset(area, 0, sizeof (struct lu_dirpage));
1923 area += sizeof (struct lu_dirpage);
1924 nob -= sizeof (struct lu_dirpage);
1927 LASSERT(nob > sizeof *ent);
1937 name = (char *)iops->key(env, it);
1938 len = iops->key_size(env, it);
1940 pack = (struct lu_fid_pack *)iops->rec(env, it);
1941 result = fid_unpack(pack, fid);
1945 recsize = (sizeof(*ent) + len + 7) & ~7;
1946 hash = iops->store(env, it);
1949 CDEBUG(D_INFO, "%p %p %d "DFID": "LPU64" (%d) \"%*.*s\"\n",
1950 name, ent, nob, PFID(fid), hash, len, len, len, name);
1952 if (nob >= recsize) {
1953 ent->lde_fid = *fid;
1954 fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid);
1955 ent->lde_hash = hash;
1956 ent->lde_namelen = cpu_to_le16(len);
1957 ent->lde_reclen = cpu_to_le16(recsize);
1958 memcpy(ent->lde_name, name, len);
1959 if (first && ent == area)
1962 ent = (void *)ent + recsize;
1964 result = iops->next(env, it);
1967 * record doesn't fit into page, enlarge previous one.
1969 LASSERT(*last != NULL);
1970 (*last)->lde_reclen =
1971 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
1975 } while (result == 0);
1980 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
1981 const struct lu_rdpg *rdpg)
1984 struct dt_object *next = mdd_object_child(obj);
1985 const struct dt_it_ops *iops;
1987 struct lu_dirent *last = NULL;
1994 LASSERT(rdpg->rp_pages != NULL);
1995 LASSERT(next->do_index_ops != NULL);
1997 if (rdpg->rp_count <= 0)
2001 * iterate through directory and fill pages from @rdpg
2003 iops = &next->do_index_ops->dio_it;
2004 it = iops->init(env, next, mdd_object_capa(env, obj));
2008 rc = iops->load(env, it, rdpg->rp_hash);
2012 * Iterator didn't find record with exactly the key requested.
2014 * It is currently either
2016 * - positioned above record with key less than
2017 * requested---skip it.
2019 * - or not positioned at all (is in IAM_IT_SKEWED
2020 * state)---position it on the next item.
2022 rc = iops->next(env, it);
2027 * At this point and across for-loop:
2029 * rc == 0 -> ok, proceed.
2030 * rc > 0 -> end of directory.
2033 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2034 i++, nob -= CFS_PAGE_SIZE) {
2035 LASSERT(i < rdpg->rp_npages);
2036 pg = rdpg->rp_pages[i];
2037 rc = mdd_dir_page_build(env, !i, cfs_kmap(pg),
2038 min_t(int, nob, CFS_PAGE_SIZE), iops,
2039 it, &hash_start, &hash_end, &last);
2040 if (rc != 0 || i == rdpg->rp_npages - 1)
2041 last->lde_reclen = 0;
2048 hash_end = DIR_END_OFF;
2052 struct lu_dirpage *dp;
2054 dp = cfs_kmap(rdpg->rp_pages[0]);
2055 dp->ldp_hash_start = rdpg->rp_hash;
2056 dp->ldp_hash_end = hash_end;
2059 * No pages were processed, mark this.
2061 dp->ldp_flags |= LDF_EMPTY;
2062 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2063 cfs_kunmap(rdpg->rp_pages[0]);
2066 iops->fini(env, it);
2071 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2072 const struct lu_rdpg *rdpg)
2074 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2078 LASSERT(mdd_object_exists(mdd_obj));
2080 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2081 rc = mdd_readpage_sanity_check(env, mdd_obj);
2083 GOTO(out_unlock, rc);
2085 if (mdd_is_dead_obj(mdd_obj)) {
2087 struct lu_dirpage *dp;
2090 * According to POSIX, please do not return any entry to client:
2091 * even dot and dotdot should not be returned.
2093 CWARN("readdir from dead object: "DFID"\n",
2094 PFID(mdd_object_fid(mdd_obj)));
2096 if (rdpg->rp_count <= 0)
2097 GOTO(out_unlock, rc = -EFAULT);
2098 LASSERT(rdpg->rp_pages != NULL);
2100 pg = rdpg->rp_pages[0];
2101 dp = (struct lu_dirpage*)cfs_kmap(pg);
2102 memset(dp, 0 , sizeof(struct lu_dirpage));
2103 dp->ldp_hash_start = rdpg->rp_hash;
2104 dp->ldp_hash_end = DIR_END_OFF;
2105 dp->ldp_flags |= LDF_EMPTY;
2106 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2108 GOTO(out_unlock, rc = 0);
2111 rc = __mdd_readpage(env, mdd_obj, rdpg);
2115 mdd_read_unlock(env, mdd_obj);
2119 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2121 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2122 struct dt_object *next;
2124 LASSERT(mdd_object_exists(mdd_obj));
2125 next = mdd_object_child(mdd_obj);
2126 return next->do_ops->do_object_sync(env, next);
2129 const struct md_object_operations mdd_obj_ops = {
2130 .moo_permission = mdd_permission,
2131 .moo_attr_get = mdd_attr_get,
2132 .moo_attr_set = mdd_attr_set,
2133 .moo_xattr_get = mdd_xattr_get,
2134 .moo_xattr_set = mdd_xattr_set,
2135 .moo_xattr_list = mdd_xattr_list,
2136 .moo_xattr_del = mdd_xattr_del,
2137 .moo_object_create = mdd_object_create,
2138 .moo_ref_add = mdd_ref_add,
2139 .moo_ref_del = mdd_ref_del,
2140 .moo_open = mdd_open,
2141 .moo_close = mdd_close,
2142 .moo_readpage = mdd_readpage,
2143 .moo_readlink = mdd_readlink,
2144 .moo_capa_get = mdd_capa_get,
2145 .moo_object_sync = mdd_object_sync,
2146 .moo_path = mdd_path,