1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <linux/jbd.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
54 /* fid_be_cpu(), fid_cpu_to_be(). */
55 #include <lustre_fid.h>
57 #include <lustre_param.h>
58 #include <linux/ldiskfs_fs.h>
59 #include <lustre_mds.h>
60 #include <lustre/lustre_idl.h>
62 #include "mdd_internal.h"
64 static const struct lu_object_operations mdd_lu_obj_ops;
66 static int mdd_xattr_get(const struct lu_env *env,
67 struct md_object *obj, struct lu_buf *buf,
70 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
71 struct lu_attr *la, struct lustre_capa *capa)
73 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
74 PFID(mdd_object_fid(obj)));
75 return mdo_attr_get(env, obj, la, capa);
78 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
80 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
82 if (flags & LUSTRE_APPEND_FL)
83 obj->mod_flags |= APPEND_OBJ;
85 if (flags & LUSTRE_IMMUTABLE_FL)
86 obj->mod_flags |= IMMUTE_OBJ;
89 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
91 struct mdd_thread_info *info;
93 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
94 LASSERT(info != NULL);
98 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
102 buf = &mdd_env_info(env)->mti_buf;
108 void mdd_buf_put(struct lu_buf *buf)
110 if (buf == NULL || buf->lb_buf == NULL)
113 OBD_VFREE(buf->lb_buf, buf->lb_len);
115 OBD_FREE(buf->lb_buf, buf->lb_len);
119 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
120 const void *area, ssize_t len)
124 buf = &mdd_env_info(env)->mti_buf;
125 buf->lb_buf = (void *)area;
130 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
131 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
133 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
135 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
137 OBD_VFREE(buf->lb_buf, buf->lb_len);
139 OBD_FREE(buf->lb_buf, buf->lb_len);
142 if (buf->lb_buf == NULL) {
144 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
145 OBD_ALLOC(buf->lb_buf, buf->lb_len);
148 if (buf->lb_buf == NULL) {
149 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
152 if (buf->lb_buf == NULL)
158 /* preserve old data */
159 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
161 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164 LASSERT(len >= oldbuf->lb_len);
165 if (len > BUF_VMALLOC_SIZE) {
166 OBD_VMALLOC(buf.lb_buf, len);
169 OBD_ALLOC(buf.lb_buf, len);
172 if (buf.lb_buf == NULL)
176 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
178 if (oldbuf->lb_vmalloc)
179 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
181 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
183 memcpy(oldbuf, &buf, sizeof(buf));
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189 struct mdd_device *mdd)
191 struct mdd_thread_info *mti = mdd_env_info(env);
194 max_cookie_size = mdd_lov_cookiesize(env, mdd);
195 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196 if (mti->mti_max_cookie)
197 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
198 mti->mti_max_cookie = NULL;
199 mti->mti_max_cookie_size = 0;
201 if (unlikely(mti->mti_max_cookie == NULL)) {
202 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
203 if (likely(mti->mti_max_cookie != NULL))
204 mti->mti_max_cookie_size = max_cookie_size;
206 if (likely(mti->mti_max_cookie != NULL))
207 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
208 return mti->mti_max_cookie;
211 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
212 struct mdd_device *mdd)
214 struct mdd_thread_info *mti = mdd_env_info(env);
217 max_lmm_size = mdd_lov_mdsize(env, mdd);
218 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
219 if (mti->mti_max_lmm)
220 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
221 mti->mti_max_lmm = NULL;
222 mti->mti_max_lmm_size = 0;
224 if (unlikely(mti->mti_max_lmm == NULL)) {
225 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
226 if (unlikely(mti->mti_max_lmm != NULL))
227 mti->mti_max_lmm_size = max_lmm_size;
229 return mti->mti_max_lmm;
232 struct lu_object *mdd_object_alloc(const struct lu_env *env,
233 const struct lu_object_header *hdr,
236 struct mdd_object *mdd_obj;
238 OBD_ALLOC_PTR(mdd_obj);
239 if (mdd_obj != NULL) {
242 o = mdd2lu_obj(mdd_obj);
243 lu_object_init(o, NULL, d);
244 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
245 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
246 mdd_obj->mod_count = 0;
247 o->lo_ops = &mdd_lu_obj_ops;
254 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
255 const struct lu_object_conf *_)
257 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
258 struct mdd_object *mdd_obj = lu2mdd_obj(o);
259 struct lu_object *below;
260 struct lu_device *under;
263 mdd_obj->mod_cltime = 0;
264 under = &d->mdd_child->dd_lu_dev;
265 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
266 mdd_pdlock_init(mdd_obj);
270 lu_object_add(o, below);
275 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
277 if (lu_object_exists(o))
278 return mdd_get_flags(env, lu2mdd_obj(o));
283 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
285 struct mdd_object *mdd = lu2mdd_obj(o);
291 static int mdd_object_print(const struct lu_env *env, void *cookie,
292 lu_printer_t p, const struct lu_object *o)
294 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
297 static const struct lu_object_operations mdd_lu_obj_ops = {
298 .loo_object_init = mdd_object_init,
299 .loo_object_start = mdd_object_start,
300 .loo_object_free = mdd_object_free,
301 .loo_object_print = mdd_object_print,
304 struct mdd_object *mdd_object_find(const struct lu_env *env,
305 struct mdd_device *d,
306 const struct lu_fid *f)
308 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
311 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
312 const char *path, struct lu_fid *fid)
315 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
316 struct mdd_object *obj;
317 struct lu_name *lname = &mdd_env_info(env)->mti_name;
322 /* temp buffer for path element */
323 buf = mdd_buf_alloc(env, PATH_MAX);
324 if (buf->lb_buf == NULL)
327 lname->ln_name = name = buf->lb_buf;
328 lname->ln_namelen = 0;
329 *f = mdd->mdd_root_fid;
336 while (*path != '/' && *path != '\0') {
344 /* find obj corresponding to fid */
345 obj = mdd_object_find(env, mdd, f);
347 GOTO(out, rc = -EREMOTE);
349 GOTO(out, rc = -PTR_ERR(obj));
350 /* get child fid from parent and name */
351 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
352 mdd_object_put(env, obj);
357 lname->ln_namelen = 0;
366 /** The maximum depth that fid2path() will search.
367 * This is limited only because we want to store the fids for
368 * historical path lookup purposes.
370 #define MAX_PATH_DEPTH 100
372 /** mdd_path() lookup structure. */
373 struct path_lookup_info {
374 __u64 pli_recno; /**< history point */
375 struct lu_fid pli_fid;
376 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377 struct mdd_object *pli_mdd_obj;
378 char *pli_path; /**< full path */
380 int pli_linkno; /**< which hardlink to follow */
381 int pli_fidcount; /**< number of \a pli_fids */
384 static int mdd_path_current(const struct lu_env *env,
385 struct path_lookup_info *pli)
387 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388 struct mdd_object *mdd_obj;
389 struct lu_buf *buf = NULL;
390 struct link_ea_header *leh;
391 struct link_ea_entry *lee;
392 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
399 ptr = pli->pli_path + pli->pli_pathlen - 1;
402 pli->pli_fidcount = 0;
403 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
405 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406 mdd_obj = mdd_object_find(env, mdd,
407 &pli->pli_fids[pli->pli_fidcount]);
409 GOTO(out, rc = -EREMOTE);
411 GOTO(out, rc = -PTR_ERR(mdd_obj));
412 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
414 mdd_object_put(env, mdd_obj);
418 /* Do I need to error out here? */
423 /* Get parent fid and object name */
424 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425 buf = mdd_links_get(env, mdd_obj);
427 GOTO(out, rc = PTR_ERR(buf));
428 mdd_read_unlock(env, mdd_obj);
429 mdd_object_put(env, mdd_obj);
434 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
435 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
437 /* If set, use link #linkno for path lookup, otherwise use
438 link #0. Only do this for the final path element. */
439 if ((pli->pli_fidcount == 0) &&
440 (pli->pli_linkno < leh->leh_reccount)) {
442 for (count = 0; count < pli->pli_linkno; count++) {
443 lee = (struct link_ea_entry *)
444 ((char *)lee + reclen);
445 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
447 if (pli->pli_linkno < leh->leh_reccount - 1)
448 /* indicate to user there are more links */
452 /* Pack the name in the end of the buffer */
453 ptr -= tmpname->ln_namelen;
454 if (ptr - 1 <= pli->pli_path)
455 GOTO(out, rc = -EOVERFLOW);
456 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
459 /* Store the parent fid for historic lookup */
460 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
461 GOTO(out, rc = -EOVERFLOW);
462 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
465 /* Verify that our path hasn't changed since we started the lookup */
466 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
468 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
469 GOTO (out, rc = -EAGAIN);
471 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
472 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
473 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
474 PFID(&pli->pli_fid));
475 GOTO(out, rc = -EAGAIN);
478 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
482 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
483 /* if we vmalloced a large buffer drop it */
489 /* Returns the full path to this fid, as of changelog record recno. */
490 static int mdd_path(const struct lu_env *env, struct md_object *obj,
491 char *path, int pathlen, __u64 recno, int *linkno)
493 struct path_lookup_info *pli;
501 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
511 pli->pli_mdd_obj = md2mdd_obj(obj);
512 pli->pli_recno = recno;
513 pli->pli_path = path;
514 pli->pli_pathlen = pathlen;
515 pli->pli_linkno = *linkno;
517 /* Retry multiple times in case file is being moved */
518 while (tries-- && rc == -EAGAIN)
519 rc = mdd_path_current(env, pli);
521 #if 0 /* We need old path names only for replication */
522 /* For historical path lookup, the current links may not have existed
523 * at "recno" time. We must switch over to earlier links/parents
524 * by using the changelog records. If the earlier parent doesn't
525 * exist, we must search back through the changelog to reconstruct
526 * its parents, then check if it exists, etc.
527 * We may ignore this problem for the initial implementation and
528 * state that an "original" hardlink must still exist for us to find
529 * historic path name. */
530 if (pli->pli_recno != -1)
531 rc = mdd_path_historic(env, pli);
534 /* return next link index to caller */
535 *linkno = pli->pli_linkno;
542 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
544 struct lu_attr *la = &mdd_env_info(env)->mti_la;
548 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
550 mdd_flags_xlate(obj, la->la_flags);
551 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
552 obj->mod_flags |= MNLINK_OBJ;
557 /* get only inode attributes */
558 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
564 if (ma->ma_valid & MA_INODE)
567 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
568 mdd_object_capa(env, mdd_obj));
570 ma->ma_valid |= MA_INODE;
574 static int mdd_get_default_md(struct mdd_object *mdd_obj,
575 struct lov_mds_md *lmm, int *size)
577 struct lov_desc *ldesc;
578 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
581 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
582 LASSERT(ldesc != NULL);
587 lmm->lmm_magic = LOV_MAGIC_V1;
588 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
589 lmm->lmm_pattern = ldesc->ld_pattern;
590 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
591 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
592 *size = sizeof(struct lov_mds_md);
594 RETURN(sizeof(struct lov_mds_md));
597 /* get lov EA only */
598 static int __mdd_lmm_get(const struct lu_env *env,
599 struct mdd_object *mdd_obj, struct md_attr *ma)
604 if (ma->ma_valid & MA_LOV)
607 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
610 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
611 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
616 ma->ma_valid |= MA_LOV;
622 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
628 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
629 rc = __mdd_lmm_get(env, mdd_obj, ma);
630 mdd_read_unlock(env, mdd_obj);
635 static int __mdd_lmv_get(const struct lu_env *env,
636 struct mdd_object *mdd_obj, struct md_attr *ma)
641 if (ma->ma_valid & MA_LMV)
644 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
647 ma->ma_valid |= MA_LMV;
653 static int mdd_attr_get_internal(const struct lu_env *env,
654 struct mdd_object *mdd_obj,
660 if (ma->ma_need & MA_INODE)
661 rc = mdd_iattr_get(env, mdd_obj, ma);
663 if (rc == 0 && ma->ma_need & MA_LOV) {
664 if (S_ISREG(mdd_object_type(mdd_obj)) ||
665 S_ISDIR(mdd_object_type(mdd_obj)))
666 rc = __mdd_lmm_get(env, mdd_obj, ma);
668 if (rc == 0 && ma->ma_need & MA_LMV) {
669 if (S_ISDIR(mdd_object_type(mdd_obj)))
670 rc = __mdd_lmv_get(env, mdd_obj, ma);
672 #ifdef CONFIG_FS_POSIX_ACL
673 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
674 if (S_ISDIR(mdd_object_type(mdd_obj)))
675 rc = mdd_def_acl_get(env, mdd_obj, ma);
678 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
683 int mdd_attr_get_internal_locked(const struct lu_env *env,
684 struct mdd_object *mdd_obj, struct md_attr *ma)
687 int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
690 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
691 rc = mdd_attr_get_internal(env, mdd_obj, ma);
693 mdd_read_unlock(env, mdd_obj);
698 * No permission check is needed.
700 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
703 struct mdd_object *mdd_obj = md2mdd_obj(obj);
707 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
712 * No permission check is needed.
714 static int mdd_xattr_get(const struct lu_env *env,
715 struct md_object *obj, struct lu_buf *buf,
718 struct mdd_object *mdd_obj = md2mdd_obj(obj);
723 LASSERT(mdd_object_exists(mdd_obj));
725 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
726 rc = mdo_xattr_get(env, mdd_obj, buf, name,
727 mdd_object_capa(env, mdd_obj));
728 mdd_read_unlock(env, mdd_obj);
734 * Permission check is done when open,
735 * no need check again.
737 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
740 struct mdd_object *mdd_obj = md2mdd_obj(obj);
741 struct dt_object *next;
746 LASSERT(mdd_object_exists(mdd_obj));
748 next = mdd_object_child(mdd_obj);
749 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
750 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
751 mdd_object_capa(env, mdd_obj));
752 mdd_read_unlock(env, mdd_obj);
757 * No permission check is needed.
759 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
762 struct mdd_object *mdd_obj = md2mdd_obj(obj);
767 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
768 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
769 mdd_read_unlock(env, mdd_obj);
774 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
775 struct mdd_object *c, struct md_attr *ma,
776 struct thandle *handle,
777 const struct md_op_spec *spec)
779 struct lu_attr *attr = &ma->ma_attr;
780 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
781 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
782 const struct dt_index_features *feat = spec->sp_feat;
786 if (!mdd_object_exists(c)) {
787 struct dt_object *next = mdd_object_child(c);
790 if (feat != &dt_directory_features && feat != NULL)
791 dof->dof_type = DFT_INDEX;
793 dof->dof_type = dt_mode_to_dft(attr->la_mode);
795 dof->u.dof_idx.di_feat = feat;
797 /* @hint will be initialized by underlying device. */
798 next->do_ops->do_ah_init(env, hint,
799 p ? mdd_object_child(p) : NULL,
800 attr->la_mode & S_IFMT);
802 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
803 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
811 * Make sure the ctime is increased only.
813 static inline int mdd_attr_check(const struct lu_env *env,
814 struct mdd_object *obj,
815 struct lu_attr *attr)
817 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
821 if (attr->la_valid & LA_CTIME) {
822 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
826 if (attr->la_ctime < tmp_la->la_ctime)
827 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
828 else if (attr->la_valid == LA_CTIME &&
829 attr->la_ctime == tmp_la->la_ctime)
830 attr->la_valid &= ~LA_CTIME;
835 int mdd_attr_set_internal(const struct lu_env *env,
836 struct mdd_object *obj,
837 struct lu_attr *attr,
838 struct thandle *handle,
844 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
845 #ifdef CONFIG_FS_POSIX_ACL
846 if (!rc && (attr->la_valid & LA_MODE) && needacl)
847 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
852 int mdd_attr_check_set_internal(const struct lu_env *env,
853 struct mdd_object *obj,
854 struct lu_attr *attr,
855 struct thandle *handle,
861 rc = mdd_attr_check(env, obj, attr);
866 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
870 static int mdd_attr_set_internal_locked(const struct lu_env *env,
871 struct mdd_object *obj,
872 struct lu_attr *attr,
873 struct thandle *handle,
879 needacl = needacl && (attr->la_valid & LA_MODE);
881 mdd_write_lock(env, obj, MOR_TGT_CHILD);
882 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
884 mdd_write_unlock(env, obj);
888 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
889 struct mdd_object *obj,
890 struct lu_attr *attr,
891 struct thandle *handle,
897 needacl = needacl && (attr->la_valid & LA_MODE);
899 mdd_write_lock(env, obj, MOR_TGT_CHILD);
900 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
902 mdd_write_unlock(env, obj);
906 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
907 const struct lu_buf *buf, const char *name,
908 int fl, struct thandle *handle)
910 struct lustre_capa *capa = mdd_object_capa(env, obj);
914 if (buf->lb_buf && buf->lb_len > 0)
915 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
916 else if (buf->lb_buf == NULL && buf->lb_len == 0)
917 rc = mdo_xattr_del(env, obj, name, handle, capa);
923 * This gives the same functionality as the code between
924 * sys_chmod and inode_setattr
925 * chown_common and inode_setattr
926 * utimes and inode_setattr
927 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
929 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
930 struct lu_attr *la, const struct md_attr *ma)
932 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
933 struct md_ucred *uc = md_ucred(env);
940 /* Do not permit change file type */
941 if (la->la_valid & LA_TYPE)
944 /* They should not be processed by setattr */
945 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
948 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
952 if (la->la_valid == LA_CTIME) {
953 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
954 /* This is only for set ctime when rename's source is
956 rc = mdd_may_delete(env, NULL, obj,
957 (struct md_attr *)ma, 1, 0);
958 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
959 la->la_valid &= ~LA_CTIME;
963 if (la->la_valid == LA_ATIME) {
964 /* This is atime only set for read atime update on close. */
965 if (la->la_atime <= tmp_la->la_atime +
966 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
967 la->la_valid &= ~LA_ATIME;
971 /* Check if flags change. */
972 if (la->la_valid & LA_FLAGS) {
973 unsigned int oldflags = 0;
974 unsigned int newflags = la->la_flags &
975 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
977 if ((uc->mu_fsuid != tmp_la->la_uid) &&
978 !mdd_capable(uc, CFS_CAP_FOWNER))
981 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
982 * only be changed by the relevant capability. */
983 if (mdd_is_immutable(obj))
984 oldflags |= LUSTRE_IMMUTABLE_FL;
985 if (mdd_is_append(obj))
986 oldflags |= LUSTRE_APPEND_FL;
987 if ((oldflags ^ newflags) &&
988 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
991 if (!S_ISDIR(tmp_la->la_mode))
992 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
995 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
996 (la->la_valid & ~LA_FLAGS) &&
997 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1000 /* Check for setting the obj time. */
1001 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1002 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1003 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1004 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1005 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1013 /* Make sure a caller can chmod. */
1014 if (la->la_valid & LA_MODE) {
1015 /* Bypass la_vaild == LA_MODE,
1016 * this is for changing file with SUID or SGID. */
1017 if ((la->la_valid & ~LA_MODE) &&
1018 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1019 (uc->mu_fsuid != tmp_la->la_uid) &&
1020 !mdd_capable(uc, CFS_CAP_FOWNER))
1023 if (la->la_mode == (umode_t) -1)
1024 la->la_mode = tmp_la->la_mode;
1026 la->la_mode = (la->la_mode & S_IALLUGO) |
1027 (tmp_la->la_mode & ~S_IALLUGO);
1029 /* Also check the setgid bit! */
1030 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1031 la->la_gid : tmp_la->la_gid) &&
1032 !mdd_capable(uc, CFS_CAP_FSETID))
1033 la->la_mode &= ~S_ISGID;
1035 la->la_mode = tmp_la->la_mode;
1038 /* Make sure a caller can chown. */
1039 if (la->la_valid & LA_UID) {
1040 if (la->la_uid == (uid_t) -1)
1041 la->la_uid = tmp_la->la_uid;
1042 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1043 (la->la_uid != tmp_la->la_uid)) &&
1044 !mdd_capable(uc, CFS_CAP_CHOWN))
1047 /* If the user or group of a non-directory has been
1048 * changed by a non-root user, remove the setuid bit.
1049 * 19981026 David C Niemi <niemi@tux.org>
1051 * Changed this to apply to all users, including root,
1052 * to avoid some races. This is the behavior we had in
1053 * 2.0. The check for non-root was definitely wrong
1054 * for 2.2 anyway, as it should have been using
1055 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1056 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1057 !S_ISDIR(tmp_la->la_mode)) {
1058 la->la_mode &= ~S_ISUID;
1059 la->la_valid |= LA_MODE;
1063 /* Make sure caller can chgrp. */
1064 if (la->la_valid & LA_GID) {
1065 if (la->la_gid == (gid_t) -1)
1066 la->la_gid = tmp_la->la_gid;
1067 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1068 ((la->la_gid != tmp_la->la_gid) &&
1069 !lustre_in_group_p(uc, la->la_gid))) &&
1070 !mdd_capable(uc, CFS_CAP_CHOWN))
1073 /* Likewise, if the user or group of a non-directory
1074 * has been changed by a non-root user, remove the
1075 * setgid bit UNLESS there is no group execute bit
1076 * (this would be a file marked for mandatory
1077 * locking). 19981026 David C Niemi <niemi@tux.org>
1079 * Removed the fsuid check (see the comment above) --
1081 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1082 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1083 la->la_mode &= ~S_ISGID;
1084 la->la_valid |= LA_MODE;
1088 /* For both Size-on-MDS case and truncate case,
1089 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1090 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1091 * For SOM case, it is true, the MAY_WRITE perm has been checked
1092 * when open, no need check again. For truncate case, it is false,
1093 * the MAY_WRITE perm should be checked here. */
1094 if (ma->ma_attr_flags & MDS_SOM) {
1095 /* For the "Size-on-MDS" setattr update, merge coming
1096 * attributes with the set in the inode. BUG 10641 */
1097 if ((la->la_valid & LA_ATIME) &&
1098 (la->la_atime <= tmp_la->la_atime))
1099 la->la_valid &= ~LA_ATIME;
1101 /* OST attributes do not have a priority over MDS attributes,
1102 * so drop times if ctime is equal. */
1103 if ((la->la_valid & LA_CTIME) &&
1104 (la->la_ctime <= tmp_la->la_ctime))
1105 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1107 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1108 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1109 (uc->mu_fsuid == tmp_la->la_uid)) &&
1110 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1111 rc = mdd_permission_internal_locked(env, obj,
1118 if (la->la_valid & LA_CTIME) {
1119 /* The pure setattr, it has the priority over what is
1120 * already set, do not drop it if ctime is equal. */
1121 if (la->la_ctime < tmp_la->la_ctime)
1122 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1130 /** Store a data change changelog record
1131 * If this fails, we must fail the whole transaction; we don't
1132 * want the change to commit without the log entry.
1133 * \param mdd_obj - mdd_object of change
1134 * \param handle - transacion handle
1136 static int mdd_changelog_data_store(const struct lu_env *env,
1137 struct mdd_device *mdd,
1138 enum changelog_rec_type type,
1139 struct mdd_object *mdd_obj,
1140 struct thandle *handle)
1142 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1143 struct llog_changelog_rec *rec;
1148 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1151 LASSERT(handle != NULL);
1152 LASSERT(mdd_obj != NULL);
1154 if ((type == CL_SETATTR) &&
1155 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1156 /* Don't need multiple updates in this log */
1157 /* Don't check under lock - no big deal if we get an extra
1162 reclen = llog_data_len(sizeof(*rec));
1163 buf = mdd_buf_alloc(env, reclen);
1164 if (buf->lb_buf == NULL)
1166 rec = (struct llog_changelog_rec *)buf->lb_buf;
1168 rec->cr_flags = CLF_VERSION;
1169 rec->cr_type = (__u32)type;
1170 rec->cr_tfid = *tfid;
1171 rec->cr_namelen = 0;
1172 mdd_obj->mod_cltime = cfs_time_current_64();
1174 rc = mdd_changelog_llog_write(mdd, rec, handle);
1176 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1177 rc, type, PFID(tfid));
1184 /* set attr and LOV EA at once, return updated attr */
1185 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1186 const struct md_attr *ma)
1188 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1189 struct mdd_device *mdd = mdo2mdd(obj);
1190 struct thandle *handle;
1191 struct lov_mds_md *lmm = NULL;
1192 struct llog_cookie *logcookies = NULL;
1193 int rc, lmm_size = 0, cookie_size = 0;
1194 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1195 #ifdef HAVE_QUOTA_SUPPORT
1196 struct obd_device *obd = mdd->mdd_obd_dev;
1197 struct mds_obd *mds = &obd->u.mds;
1198 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1199 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1200 int quota_opc = 0, block_count = 0;
1201 int inode_pending = 0, block_pending = 0;
1205 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1206 MDD_TXN_ATTR_SET_OP);
1207 handle = mdd_trans_start(env, mdd);
1209 RETURN(PTR_ERR(handle));
1210 /*TODO: add lock here*/
1211 /* start a log jounal handle if needed */
1212 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1213 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1214 lmm_size = mdd_lov_mdsize(env, mdd);
1215 lmm = mdd_max_lmm_get(env, mdd);
1217 GOTO(cleanup, rc = -ENOMEM);
1219 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1226 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1227 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1228 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1230 *la_copy = ma->ma_attr;
1231 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1235 #ifdef HAVE_QUOTA_SUPPORT
1236 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1237 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1239 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1241 quota_opc = FSFILT_OP_SETATTR;
1242 mdd_quota_wrapper(la_copy, qnids);
1243 mdd_quota_wrapper(la_tmp, qoids);
1244 /* get file quota for new owner */
1245 lquota_chkquota(mds_quota_interface_ref, obd,
1246 qnids[USRQUOTA], qnids[GRPQUOTA], 1,
1247 &inode_pending, NULL, 0);
1248 block_count = (la_tmp->la_blocks + 7) >> 3;
1250 /* get block quota for new owner */
1251 lquota_chkquota(mds_quota_interface_ref, obd,
1254 block_count, &block_pending,
1255 NULL, LQUOTA_FLAGS_BLK);
1260 if (la_copy->la_valid & LA_FLAGS) {
1261 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1264 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1265 } else if (la_copy->la_valid) { /* setattr */
1266 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1268 /* journal chown/chgrp in llog, just like unlink */
1269 if (rc == 0 && lmm_size){
1270 cookie_size = mdd_lov_cookiesize(env, mdd);
1271 logcookies = mdd_max_cookie_get(env, mdd);
1272 if (logcookies == NULL)
1273 GOTO(cleanup, rc = -ENOMEM);
1275 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1276 logcookies, cookie_size) <= 0)
1281 if (rc == 0 && ma->ma_valid & MA_LOV) {
1284 mode = mdd_object_type(mdd_obj);
1285 if (S_ISREG(mode) || S_ISDIR(mode)) {
1286 rc = mdd_lsm_sanity_check(env, mdd_obj);
1290 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1291 ma->ma_lmm_size, handle, 1);
1296 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1297 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1299 mdd_trans_stop(env, mdd, rc, handle);
1300 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1301 /*set obd attr, if needed*/
1302 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1305 #ifdef HAVE_QUOTA_SUPPORT
1308 lquota_pending_commit(mds_quota_interface_ref, obd,
1309 qnids[USRQUOTA], qnids[GRPQUOTA],
1312 lquota_pending_commit(mds_quota_interface_ref, obd,
1313 qnids[USRQUOTA], qnids[GRPQUOTA],
1315 /* Trigger dqrel/dqacq for original owner and new owner.
1316 * If failed, the next call for lquota_chkquota will
1318 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1325 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1326 const struct lu_buf *buf, const char *name, int fl,
1327 struct thandle *handle)
1332 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1333 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1334 mdd_write_unlock(env, obj);
1339 static int mdd_xattr_sanity_check(const struct lu_env *env,
1340 struct mdd_object *obj)
1342 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1343 struct md_ucred *uc = md_ucred(env);
1347 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1350 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1354 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1355 !mdd_capable(uc, CFS_CAP_FOWNER))
1362 * The caller should guarantee to update the object ctime
1363 * after xattr_set if needed.
1365 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1366 const struct lu_buf *buf, const char *name,
1369 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1370 struct mdd_device *mdd = mdo2mdd(obj);
1371 struct thandle *handle;
1375 rc = mdd_xattr_sanity_check(env, mdd_obj);
1379 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1380 handle = mdd_trans_start(env, mdd);
1382 RETURN(PTR_ERR(handle));
1384 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1386 /* Only record user xattr changes */
1387 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1388 (strncmp("user.", name, 5) == 0))
1389 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1391 mdd_trans_stop(env, mdd, rc, handle);
1397 * The caller should guarantee to update the object ctime
1398 * after xattr_set if needed.
1400 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1403 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1404 struct mdd_device *mdd = mdo2mdd(obj);
1405 struct thandle *handle;
1409 rc = mdd_xattr_sanity_check(env, mdd_obj);
1413 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1414 handle = mdd_trans_start(env, mdd);
1416 RETURN(PTR_ERR(handle));
1418 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1419 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1420 mdd_object_capa(env, mdd_obj));
1421 mdd_write_unlock(env, mdd_obj);
1423 /* Only record user xattr changes */
1424 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1425 (strncmp("user.", name, 5) != 0))
1426 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1429 mdd_trans_stop(env, mdd, rc, handle);
1434 /* partial unlink */
1435 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1438 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1439 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1440 struct mdd_device *mdd = mdo2mdd(obj);
1441 struct thandle *handle;
1442 #ifdef HAVE_QUOTA_SUPPORT
1443 struct obd_device *obd = mdd->mdd_obd_dev;
1444 struct mds_obd *mds = &obd->u.mds;
1445 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1452 * Check -ENOENT early here because we need to get object type
1453 * to calculate credits before transaction start
1455 if (!mdd_object_exists(mdd_obj))
1458 LASSERT(mdd_object_exists(mdd_obj) > 0);
1460 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1464 handle = mdd_trans_start(env, mdd);
1468 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1470 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1474 __mdd_ref_del(env, mdd_obj, handle, 0);
1476 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1478 __mdd_ref_del(env, mdd_obj, handle, 1);
1481 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1482 la_copy->la_ctime = ma->ma_attr.la_ctime;
1484 la_copy->la_valid = LA_CTIME;
1485 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1489 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1490 #ifdef HAVE_QUOTA_SUPPORT
1491 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1492 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1493 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1494 mdd_quota_wrapper(&ma->ma_attr, qids);
1501 mdd_write_unlock(env, mdd_obj);
1502 mdd_trans_stop(env, mdd, rc, handle);
1503 #ifdef HAVE_QUOTA_SUPPORT
1505 /* Trigger dqrel on the owner of child. If failed,
1506 * the next call for lquota_chkquota will process it */
1507 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1513 /* partial operation */
1514 static int mdd_oc_sanity_check(const struct lu_env *env,
1515 struct mdd_object *obj,
1521 switch (ma->ma_attr.la_mode & S_IFMT) {
1538 static int mdd_object_create(const struct lu_env *env,
1539 struct md_object *obj,
1540 const struct md_op_spec *spec,
1544 struct mdd_device *mdd = mdo2mdd(obj);
1545 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1546 const struct lu_fid *pfid = spec->u.sp_pfid;
1547 struct thandle *handle;
1548 #ifdef HAVE_QUOTA_SUPPORT
1549 struct obd_device *obd = mdd->mdd_obd_dev;
1550 struct mds_obd *mds = &obd->u.mds;
1551 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1552 int quota_opc = 0, block_count = 0;
1553 int inode_pending = 0, block_pending = 0;
1558 #ifdef HAVE_QUOTA_SUPPORT
1559 if (mds->mds_quota) {
1560 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1561 mdd_quota_wrapper(&ma->ma_attr, qids);
1562 /* get file quota for child */
1563 lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
1564 qids[GRPQUOTA], 1, &inode_pending, NULL, 0);
1565 switch (ma->ma_attr.la_mode & S_IFMT) {
1574 /* get block quota for child */
1576 lquota_chkquota(mds_quota_interface_ref, obd,
1577 qids[USRQUOTA], qids[GRPQUOTA],
1578 block_count, &block_pending, NULL,
1583 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1584 handle = mdd_trans_start(env, mdd);
1586 GOTO(out_pending, rc = PTR_ERR(handle));
1588 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1589 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1593 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1597 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1598 /* If creating the slave object, set slave EA here. */
1599 int lmv_size = spec->u.sp_ea.eadatalen;
1600 struct lmv_stripe_md *lmv;
1602 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1603 LASSERT(lmv != NULL && lmv_size > 0);
1605 rc = __mdd_xattr_set(env, mdd_obj,
1606 mdd_buf_get_const(env, lmv, lmv_size),
1607 MDS_LMV_MD_NAME, 0, handle);
1611 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1614 #ifdef CONFIG_FS_POSIX_ACL
1615 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1616 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1618 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1619 buf->lb_len = spec->u.sp_ea.eadatalen;
1620 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1621 rc = __mdd_acl_init(env, mdd_obj, buf,
1622 &ma->ma_attr.la_mode,
1627 ma->ma_attr.la_valid |= LA_MODE;
1630 pfid = spec->u.sp_ea.fid;
1633 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1639 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1640 mdd_write_unlock(env, mdd_obj);
1642 mdd_trans_stop(env, mdd, rc, handle);
1644 #ifdef HAVE_QUOTA_SUPPORT
1647 lquota_pending_commit(mds_quota_interface_ref, obd,
1648 qids[USRQUOTA], qids[GRPQUOTA],
1651 lquota_pending_commit(mds_quota_interface_ref, obd,
1652 qids[USRQUOTA], qids[GRPQUOTA],
1654 /* Trigger dqacq on the owner of child. If failed,
1655 * the next call for lquota_chkquota will process it. */
1656 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1657 FSFILT_OP_CREATE_PARTIAL_CHILD);
1664 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1665 const struct md_attr *ma)
1667 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1668 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1669 struct mdd_device *mdd = mdo2mdd(obj);
1670 struct thandle *handle;
1674 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1675 handle = mdd_trans_start(env, mdd);
1679 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1680 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1682 __mdd_ref_add(env, mdd_obj, handle);
1683 mdd_write_unlock(env, mdd_obj);
1685 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1686 la_copy->la_ctime = ma->ma_attr.la_ctime;
1688 la_copy->la_valid = LA_CTIME;
1689 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1692 mdd_trans_stop(env, mdd, 0, handle);
1698 * do NOT or the MAY_*'s, you'll get the weakest
1700 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1704 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1705 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1706 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1707 * owner can write to a file even if it is marked readonly to hide
1708 * its brokenness. (bug 5781) */
1709 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1710 struct md_ucred *uc = md_ucred(env);
1712 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1713 (la->la_uid == uc->mu_fsuid))
1717 if (flags & FMODE_READ)
1719 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1721 if (flags & MDS_FMODE_EXEC)
1726 static int mdd_open_sanity_check(const struct lu_env *env,
1727 struct mdd_object *obj, int flag)
1729 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1734 if (mdd_is_dead_obj(obj))
1737 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1741 if (S_ISLNK(tmp_la->la_mode))
1744 mode = accmode(env, tmp_la, flag);
1746 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1749 if (!(flag & MDS_OPEN_CREATED)) {
1750 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1755 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1756 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1757 flag &= ~MDS_OPEN_TRUNC;
1759 /* For writing append-only file must open it with append mode. */
1760 if (mdd_is_append(obj)) {
1761 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1763 if (flag & MDS_OPEN_TRUNC)
1769 * Now, flag -- O_NOATIME does not be packed by client.
1771 if (flag & O_NOATIME) {
1772 struct md_ucred *uc = md_ucred(env);
1774 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1775 (uc->mu_valid == UCRED_NEW)) &&
1776 (uc->mu_fsuid != tmp_la->la_uid) &&
1777 !mdd_capable(uc, CFS_CAP_FOWNER))
1785 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1788 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1791 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1793 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1795 mdd_obj->mod_count++;
1797 mdd_write_unlock(env, mdd_obj);
1801 /* return md_attr back,
1802 * if it is last unlink then return lov ea + llog cookie*/
1803 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1809 if (S_ISREG(mdd_object_type(obj))) {
1810 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1811 * Caller must be ready for that. */
1813 rc = __mdd_lmm_get(env, obj, ma);
1814 if ((ma->ma_valid & MA_LOV))
1815 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1822 * No permission check is needed.
1824 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1827 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1828 struct thandle *handle;
1832 #ifdef HAVE_QUOTA_SUPPORT
1833 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1834 struct mds_obd *mds = &obd->u.mds;
1835 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1840 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1843 handle = mdd_trans_start(env, mdo2mdd(obj));
1845 RETURN(PTR_ERR(handle));
1847 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1848 /* release open count */
1849 mdd_obj->mod_count --;
1851 if (mdd_obj->mod_count == 0) {
1852 /* remove link to object from orphan index */
1853 if (mdd_obj->mod_flags & ORPHAN_OBJ)
1854 __mdd_orphan_del(env, mdd_obj, handle);
1857 rc = mdd_iattr_get(env, mdd_obj, ma);
1859 if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
1860 rc = mdd_object_kill(env, mdd_obj, ma);
1861 #ifdef HAVE_QUOTA_SUPPORT
1862 if (mds->mds_quota) {
1863 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1864 mdd_quota_wrapper(&ma->ma_attr, qids);
1873 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1875 mdd_write_unlock(env, mdd_obj);
1876 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1877 #ifdef HAVE_QUOTA_SUPPORT
1879 /* Trigger dqrel on the owner of child. If failed,
1880 * the next call for lquota_chkquota will process it */
1881 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1888 * Permission check is done when open,
1889 * no need check again.
1891 static int mdd_readpage_sanity_check(const struct lu_env *env,
1892 struct mdd_object *obj)
1894 struct dt_object *next = mdd_object_child(obj);
1898 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1906 static int mdd_dir_page_build(const struct lu_env *env, int first,
1907 void *area, int nob, const struct dt_it_ops *iops,
1908 struct dt_it *it, __u64 *start, __u64 *end,
1909 struct lu_dirent **last)
1911 struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
1912 struct mdd_thread_info *info = mdd_env_info(env);
1913 struct lu_fid_pack *pack = &info->mti_pack;
1915 struct lu_dirent *ent;
1918 memset(area, 0, sizeof (struct lu_dirpage));
1919 area += sizeof (struct lu_dirpage);
1920 nob -= sizeof (struct lu_dirpage);
1923 LASSERT(nob > sizeof *ent);
1933 name = (char *)iops->key(env, it);
1934 len = iops->key_size(env, it);
1936 pack = (struct lu_fid_pack *)iops->rec(env, it);
1937 result = fid_unpack(pack, fid);
1941 recsize = (sizeof(*ent) + len + 7) & ~7;
1942 hash = iops->store(env, it);
1945 CDEBUG(D_INFO, "%p %p %d "DFID": "LPU64" (%d) \"%*.*s\"\n",
1946 name, ent, nob, PFID(fid), hash, len, len, len, name);
1948 if (nob >= recsize) {
1949 ent->lde_fid = *fid;
1950 fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid);
1951 ent->lde_hash = hash;
1952 ent->lde_namelen = cpu_to_le16(len);
1953 ent->lde_reclen = cpu_to_le16(recsize);
1954 memcpy(ent->lde_name, name, len);
1955 if (first && ent == area)
1958 ent = (void *)ent + recsize;
1960 result = iops->next(env, it);
1963 * record doesn't fit into page, enlarge previous one.
1965 LASSERT(*last != NULL);
1966 (*last)->lde_reclen =
1967 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
1971 } while (result == 0);
1976 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
1977 const struct lu_rdpg *rdpg)
1980 struct dt_object *next = mdd_object_child(obj);
1981 const struct dt_it_ops *iops;
1983 struct lu_dirent *last = NULL;
1990 LASSERT(rdpg->rp_pages != NULL);
1991 LASSERT(next->do_index_ops != NULL);
1993 if (rdpg->rp_count <= 0)
1997 * iterate through directory and fill pages from @rdpg
1999 iops = &next->do_index_ops->dio_it;
2000 it = iops->init(env, next, mdd_object_capa(env, obj));
2004 rc = iops->load(env, it, rdpg->rp_hash);
2008 * Iterator didn't find record with exactly the key requested.
2010 * It is currently either
2012 * - positioned above record with key less than
2013 * requested---skip it.
2015 * - or not positioned at all (is in IAM_IT_SKEWED
2016 * state)---position it on the next item.
2018 rc = iops->next(env, it);
2023 * At this point and across for-loop:
2025 * rc == 0 -> ok, proceed.
2026 * rc > 0 -> end of directory.
2029 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2030 i++, nob -= CFS_PAGE_SIZE) {
2031 LASSERT(i < rdpg->rp_npages);
2032 pg = rdpg->rp_pages[i];
2033 rc = mdd_dir_page_build(env, !i, cfs_kmap(pg),
2034 min_t(int, nob, CFS_PAGE_SIZE), iops,
2035 it, &hash_start, &hash_end, &last);
2036 if (rc != 0 || i == rdpg->rp_npages - 1)
2037 last->lde_reclen = 0;
2044 hash_end = DIR_END_OFF;
2048 struct lu_dirpage *dp;
2050 dp = cfs_kmap(rdpg->rp_pages[0]);
2051 dp->ldp_hash_start = rdpg->rp_hash;
2052 dp->ldp_hash_end = hash_end;
2055 * No pages were processed, mark this.
2057 dp->ldp_flags |= LDF_EMPTY;
2058 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2059 cfs_kunmap(rdpg->rp_pages[0]);
2062 iops->fini(env, it);
2067 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2068 const struct lu_rdpg *rdpg)
2070 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2074 LASSERT(mdd_object_exists(mdd_obj));
2076 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2077 rc = mdd_readpage_sanity_check(env, mdd_obj);
2079 GOTO(out_unlock, rc);
2081 if (mdd_is_dead_obj(mdd_obj)) {
2083 struct lu_dirpage *dp;
2086 * According to POSIX, please do not return any entry to client:
2087 * even dot and dotdot should not be returned.
2089 CWARN("readdir from dead object: "DFID"\n",
2090 PFID(mdd_object_fid(mdd_obj)));
2092 if (rdpg->rp_count <= 0)
2093 GOTO(out_unlock, rc = -EFAULT);
2094 LASSERT(rdpg->rp_pages != NULL);
2096 pg = rdpg->rp_pages[0];
2097 dp = (struct lu_dirpage*)cfs_kmap(pg);
2098 memset(dp, 0 , sizeof(struct lu_dirpage));
2099 dp->ldp_hash_start = rdpg->rp_hash;
2100 dp->ldp_hash_end = DIR_END_OFF;
2101 dp->ldp_flags |= LDF_EMPTY;
2102 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2104 GOTO(out_unlock, rc = 0);
2107 rc = __mdd_readpage(env, mdd_obj, rdpg);
2111 mdd_read_unlock(env, mdd_obj);
2115 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2117 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2118 struct dt_object *next;
2120 LASSERT(mdd_object_exists(mdd_obj));
2121 next = mdd_object_child(mdd_obj);
2122 return next->do_ops->do_object_sync(env, next);
2125 const struct md_object_operations mdd_obj_ops = {
2126 .moo_permission = mdd_permission,
2127 .moo_attr_get = mdd_attr_get,
2128 .moo_attr_set = mdd_attr_set,
2129 .moo_xattr_get = mdd_xattr_get,
2130 .moo_xattr_set = mdd_xattr_set,
2131 .moo_xattr_list = mdd_xattr_list,
2132 .moo_xattr_del = mdd_xattr_del,
2133 .moo_object_create = mdd_object_create,
2134 .moo_ref_add = mdd_ref_add,
2135 .moo_ref_del = mdd_ref_del,
2136 .moo_open = mdd_open,
2137 .moo_close = mdd_close,
2138 .moo_readpage = mdd_readpage,
2139 .moo_readlink = mdd_readlink,
2140 .moo_capa_get = mdd_capa_get,
2141 .moo_object_sync = mdd_object_sync,
2142 .moo_path = mdd_path,