1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <linux/jbd.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
54 /* fid_be_cpu(), fid_cpu_to_be(). */
55 #include <lustre_fid.h>
57 #include <lustre_param.h>
58 #include <linux/ldiskfs_fs.h>
59 #include <lustre_mds.h>
60 #include <lustre/lustre_idl.h>
62 #include "mdd_internal.h"
64 static const struct lu_object_operations mdd_lu_obj_ops;
66 static int mdd_xattr_get(const struct lu_env *env,
67 struct md_object *obj, struct lu_buf *buf,
70 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
73 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
74 PFID(mdd_object_fid(obj)));
75 mdo_data_get(env, obj, data);
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80 struct lu_attr *la, struct lustre_capa *capa)
82 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
83 PFID(mdd_object_fid(obj)));
84 return mdo_attr_get(env, obj, la, capa);
87 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
89 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
91 if (flags & LUSTRE_APPEND_FL)
92 obj->mod_flags |= APPEND_OBJ;
94 if (flags & LUSTRE_IMMUTABLE_FL)
95 obj->mod_flags |= IMMUTE_OBJ;
98 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
100 struct mdd_thread_info *info;
102 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
103 LASSERT(info != NULL);
107 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 buf = &mdd_env_info(env)->mti_buf;
117 void mdd_buf_put(struct lu_buf *buf)
119 if (buf == NULL || buf->lb_buf == NULL)
122 OBD_VFREE(buf->lb_buf, buf->lb_len);
124 OBD_FREE(buf->lb_buf, buf->lb_len);
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129 const void *area, ssize_t len)
133 buf = &mdd_env_info(env)->mti_buf;
134 buf->lb_buf = (void *)area;
139 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
142 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
144 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
146 OBD_VFREE(buf->lb_buf, buf->lb_len);
148 OBD_FREE(buf->lb_buf, buf->lb_len);
151 if (buf->lb_buf == NULL) {
153 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
154 OBD_ALLOC(buf->lb_buf, buf->lb_len);
157 if (buf->lb_buf == NULL) {
158 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
161 if (buf->lb_buf == NULL)
167 /** Increase the size of the \a mti_big_buf.
168 * preserves old data in buffer
169 * old buffer remains unchanged on error
170 * \retval 0 or -ENOMEM
172 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
174 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
177 LASSERT(len >= oldbuf->lb_len);
178 if (len > BUF_VMALLOC_SIZE) {
179 OBD_VMALLOC(buf.lb_buf, len);
182 OBD_ALLOC(buf.lb_buf, len);
185 if (buf.lb_buf == NULL)
189 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
191 if (oldbuf->lb_vmalloc)
192 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
194 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
196 memcpy(oldbuf, &buf, sizeof(buf));
201 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
202 struct mdd_device *mdd)
204 struct mdd_thread_info *mti = mdd_env_info(env);
207 max_cookie_size = mdd_lov_cookiesize(env, mdd);
208 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
209 if (mti->mti_max_cookie)
210 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
211 mti->mti_max_cookie = NULL;
212 mti->mti_max_cookie_size = 0;
214 if (unlikely(mti->mti_max_cookie == NULL)) {
215 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
216 if (likely(mti->mti_max_cookie != NULL))
217 mti->mti_max_cookie_size = max_cookie_size;
219 if (likely(mti->mti_max_cookie != NULL))
220 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
221 return mti->mti_max_cookie;
224 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
225 struct mdd_device *mdd)
227 struct mdd_thread_info *mti = mdd_env_info(env);
230 max_lmm_size = mdd_lov_mdsize(env, mdd);
231 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
232 if (mti->mti_max_lmm)
233 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
234 mti->mti_max_lmm = NULL;
235 mti->mti_max_lmm_size = 0;
237 if (unlikely(mti->mti_max_lmm == NULL)) {
238 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
239 if (unlikely(mti->mti_max_lmm != NULL))
240 mti->mti_max_lmm_size = max_lmm_size;
242 return mti->mti_max_lmm;
245 struct lu_object *mdd_object_alloc(const struct lu_env *env,
246 const struct lu_object_header *hdr,
249 struct mdd_object *mdd_obj;
251 OBD_ALLOC_PTR(mdd_obj);
252 if (mdd_obj != NULL) {
255 o = mdd2lu_obj(mdd_obj);
256 lu_object_init(o, NULL, d);
257 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
258 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
259 mdd_obj->mod_count = 0;
260 o->lo_ops = &mdd_lu_obj_ops;
267 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
268 const struct lu_object_conf *_)
270 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
271 struct mdd_object *mdd_obj = lu2mdd_obj(o);
272 struct lu_object *below;
273 struct lu_device *under;
276 mdd_obj->mod_cltime = 0;
277 under = &d->mdd_child->dd_lu_dev;
278 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
279 mdd_pdlock_init(mdd_obj);
283 lu_object_add(o, below);
288 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
290 if (lu_object_exists(o))
291 return mdd_get_flags(env, lu2mdd_obj(o));
296 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
298 struct mdd_object *mdd = lu2mdd_obj(o);
304 static int mdd_object_print(const struct lu_env *env, void *cookie,
305 lu_printer_t p, const struct lu_object *o)
307 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
308 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
309 "valid=%x, cltime=%llu, flags=%lx",
310 mdd, mdd->mod_count, mdd->mod_valid,
311 mdd->mod_cltime, mdd->mod_flags);
314 static const struct lu_object_operations mdd_lu_obj_ops = {
315 .loo_object_init = mdd_object_init,
316 .loo_object_start = mdd_object_start,
317 .loo_object_free = mdd_object_free,
318 .loo_object_print = mdd_object_print,
321 struct mdd_object *mdd_object_find(const struct lu_env *env,
322 struct mdd_device *d,
323 const struct lu_fid *f)
325 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
328 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
329 const char *path, struct lu_fid *fid)
332 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
333 struct mdd_object *obj;
334 struct lu_name *lname = &mdd_env_info(env)->mti_name;
339 /* temp buffer for path element */
340 buf = mdd_buf_alloc(env, PATH_MAX);
341 if (buf->lb_buf == NULL)
344 lname->ln_name = name = buf->lb_buf;
345 lname->ln_namelen = 0;
346 *f = mdd->mdd_root_fid;
353 while (*path != '/' && *path != '\0') {
361 /* find obj corresponding to fid */
362 obj = mdd_object_find(env, mdd, f);
364 GOTO(out, rc = -EREMOTE);
366 GOTO(out, rc = -PTR_ERR(obj));
367 /* get child fid from parent and name */
368 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
369 mdd_object_put(env, obj);
374 lname->ln_namelen = 0;
383 /** The maximum depth that fid2path() will search.
384 * This is limited only because we want to store the fids for
385 * historical path lookup purposes.
387 #define MAX_PATH_DEPTH 100
389 /** mdd_path() lookup structure. */
390 struct path_lookup_info {
391 __u64 pli_recno; /**< history point */
392 struct lu_fid pli_fid;
393 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
394 struct mdd_object *pli_mdd_obj;
395 char *pli_path; /**< full path */
397 int pli_linkno; /**< which hardlink to follow */
398 int pli_fidcount; /**< number of \a pli_fids */
401 static int mdd_path_current(const struct lu_env *env,
402 struct path_lookup_info *pli)
404 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
405 struct mdd_object *mdd_obj;
406 struct lu_buf *buf = NULL;
407 struct link_ea_header *leh;
408 struct link_ea_entry *lee;
409 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
410 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
416 ptr = pli->pli_path + pli->pli_pathlen - 1;
419 pli->pli_fidcount = 0;
420 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
422 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
423 mdd_obj = mdd_object_find(env, mdd,
424 &pli->pli_fids[pli->pli_fidcount]);
426 GOTO(out, rc = -EREMOTE);
428 GOTO(out, rc = -PTR_ERR(mdd_obj));
429 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
431 mdd_object_put(env, mdd_obj);
435 /* Do I need to error out here? */
440 /* Get parent fid and object name */
441 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
442 buf = mdd_links_get(env, mdd_obj);
443 mdd_read_unlock(env, mdd_obj);
444 mdd_object_put(env, mdd_obj);
446 GOTO(out, rc = PTR_ERR(buf));
449 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
450 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
452 /* If set, use link #linkno for path lookup, otherwise use
453 link #0. Only do this for the final path element. */
454 if ((pli->pli_fidcount == 0) &&
455 (pli->pli_linkno < leh->leh_reccount)) {
457 for (count = 0; count < pli->pli_linkno; count++) {
458 lee = (struct link_ea_entry *)
459 ((char *)lee + reclen);
460 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
462 if (pli->pli_linkno < leh->leh_reccount - 1)
463 /* indicate to user there are more links */
467 /* Pack the name in the end of the buffer */
468 ptr -= tmpname->ln_namelen;
469 if (ptr - 1 <= pli->pli_path)
470 GOTO(out, rc = -EOVERFLOW);
471 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
474 /* Store the parent fid for historic lookup */
475 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
476 GOTO(out, rc = -EOVERFLOW);
477 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
480 /* Verify that our path hasn't changed since we started the lookup */
481 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
483 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
484 GOTO (out, rc = -EAGAIN);
486 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
487 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
488 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
489 PFID(&pli->pli_fid));
490 GOTO(out, rc = -EAGAIN);
493 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
497 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
498 /* if we vmalloced a large buffer drop it */
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506 char *path, int pathlen, __u64 recno, int *linkno)
508 struct path_lookup_info *pli;
516 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
526 pli->pli_mdd_obj = md2mdd_obj(obj);
527 pli->pli_recno = recno;
528 pli->pli_path = path;
529 pli->pli_pathlen = pathlen;
530 pli->pli_linkno = *linkno;
532 /* Retry multiple times in case file is being moved */
533 while (tries-- && rc == -EAGAIN)
534 rc = mdd_path_current(env, pli);
536 #if 0 /* We need old path names only for replication */
537 /* For historical path lookup, the current links may not have existed
538 * at "recno" time. We must switch over to earlier links/parents
539 * by using the changelog records. If the earlier parent doesn't
540 * exist, we must search back through the changelog to reconstruct
541 * its parents, then check if it exists, etc.
542 * We may ignore this problem for the initial implementation and
543 * state that an "original" hardlink must still exist for us to find
544 * historic path name. */
545 if (pli->pli_recno != -1)
546 rc = mdd_path_historic(env, pli);
549 /* return next link index to caller */
550 *linkno = pli->pli_linkno;
557 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
559 struct lu_attr *la = &mdd_env_info(env)->mti_la;
563 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
565 mdd_flags_xlate(obj, la->la_flags);
566 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
567 obj->mod_flags |= MNLINK_OBJ;
572 /* get only inode attributes */
573 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
579 if (ma->ma_valid & MA_INODE)
582 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
583 mdd_object_capa(env, mdd_obj));
585 ma->ma_valid |= MA_INODE;
589 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
592 struct lov_desc *ldesc;
593 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
596 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
597 LASSERT(ldesc != NULL);
602 lmm->lmm_magic = LOV_MAGIC_V1;
603 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
604 lmm->lmm_pattern = ldesc->ld_pattern;
605 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
606 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
607 *size = sizeof(struct lov_mds_md);
609 RETURN(sizeof(struct lov_mds_md));
612 /* get lov EA only */
613 static int __mdd_lmm_get(const struct lu_env *env,
614 struct mdd_object *mdd_obj, struct md_attr *ma)
619 if (ma->ma_valid & MA_LOV)
622 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
625 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
626 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
631 ma->ma_valid |= MA_LOV;
637 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
643 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
644 rc = __mdd_lmm_get(env, mdd_obj, ma);
645 mdd_read_unlock(env, mdd_obj);
650 static int __mdd_lmv_get(const struct lu_env *env,
651 struct mdd_object *mdd_obj, struct md_attr *ma)
656 if (ma->ma_valid & MA_LMV)
659 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
662 ma->ma_valid |= MA_LMV;
668 static int mdd_attr_get_internal(const struct lu_env *env,
669 struct mdd_object *mdd_obj,
675 if (ma->ma_need & MA_INODE)
676 rc = mdd_iattr_get(env, mdd_obj, ma);
678 if (rc == 0 && ma->ma_need & MA_LOV) {
679 if (S_ISREG(mdd_object_type(mdd_obj)) ||
680 S_ISDIR(mdd_object_type(mdd_obj)))
681 rc = __mdd_lmm_get(env, mdd_obj, ma);
683 if (rc == 0 && ma->ma_need & MA_LMV) {
684 if (S_ISDIR(mdd_object_type(mdd_obj)))
685 rc = __mdd_lmv_get(env, mdd_obj, ma);
687 #ifdef CONFIG_FS_POSIX_ACL
688 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
689 if (S_ISDIR(mdd_object_type(mdd_obj)))
690 rc = mdd_def_acl_get(env, mdd_obj, ma);
693 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
698 int mdd_attr_get_internal_locked(const struct lu_env *env,
699 struct mdd_object *mdd_obj, struct md_attr *ma)
702 int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
705 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
706 rc = mdd_attr_get_internal(env, mdd_obj, ma);
708 mdd_read_unlock(env, mdd_obj);
713 * No permission check is needed.
715 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
718 struct mdd_object *mdd_obj = md2mdd_obj(obj);
722 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
727 * No permission check is needed.
729 static int mdd_xattr_get(const struct lu_env *env,
730 struct md_object *obj, struct lu_buf *buf,
733 struct mdd_object *mdd_obj = md2mdd_obj(obj);
738 LASSERT(mdd_object_exists(mdd_obj));
740 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
741 rc = mdo_xattr_get(env, mdd_obj, buf, name,
742 mdd_object_capa(env, mdd_obj));
743 mdd_read_unlock(env, mdd_obj);
749 * Permission check is done when open,
750 * no need check again.
752 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
755 struct mdd_object *mdd_obj = md2mdd_obj(obj);
756 struct dt_object *next;
761 LASSERT(mdd_object_exists(mdd_obj));
763 next = mdd_object_child(mdd_obj);
764 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
765 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
766 mdd_object_capa(env, mdd_obj));
767 mdd_read_unlock(env, mdd_obj);
772 * No permission check is needed.
774 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
777 struct mdd_object *mdd_obj = md2mdd_obj(obj);
782 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
783 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
784 mdd_read_unlock(env, mdd_obj);
789 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
790 struct mdd_object *c, struct md_attr *ma,
791 struct thandle *handle,
792 const struct md_op_spec *spec)
794 struct lu_attr *attr = &ma->ma_attr;
795 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
796 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
797 const struct dt_index_features *feat = spec->sp_feat;
801 if (!mdd_object_exists(c)) {
802 struct dt_object *next = mdd_object_child(c);
805 if (feat != &dt_directory_features && feat != NULL)
806 dof->dof_type = DFT_INDEX;
808 dof->dof_type = dt_mode_to_dft(attr->la_mode);
810 dof->u.dof_idx.di_feat = feat;
812 /* @hint will be initialized by underlying device. */
813 next->do_ops->do_ah_init(env, hint,
814 p ? mdd_object_child(p) : NULL,
815 attr->la_mode & S_IFMT);
817 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
818 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
826 * Make sure the ctime is increased only.
828 static inline int mdd_attr_check(const struct lu_env *env,
829 struct mdd_object *obj,
830 struct lu_attr *attr)
832 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
836 if (attr->la_valid & LA_CTIME) {
837 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
841 if (attr->la_ctime < tmp_la->la_ctime)
842 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
843 else if (attr->la_valid == LA_CTIME &&
844 attr->la_ctime == tmp_la->la_ctime)
845 attr->la_valid &= ~LA_CTIME;
850 int mdd_attr_set_internal(const struct lu_env *env,
851 struct mdd_object *obj,
852 struct lu_attr *attr,
853 struct thandle *handle,
859 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
860 #ifdef CONFIG_FS_POSIX_ACL
861 if (!rc && (attr->la_valid & LA_MODE) && needacl)
862 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
867 int mdd_attr_check_set_internal(const struct lu_env *env,
868 struct mdd_object *obj,
869 struct lu_attr *attr,
870 struct thandle *handle,
876 rc = mdd_attr_check(env, obj, attr);
881 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
885 static int mdd_attr_set_internal_locked(const struct lu_env *env,
886 struct mdd_object *obj,
887 struct lu_attr *attr,
888 struct thandle *handle,
894 needacl = needacl && (attr->la_valid & LA_MODE);
896 mdd_write_lock(env, obj, MOR_TGT_CHILD);
897 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
899 mdd_write_unlock(env, obj);
903 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
904 struct mdd_object *obj,
905 struct lu_attr *attr,
906 struct thandle *handle,
912 needacl = needacl && (attr->la_valid & LA_MODE);
914 mdd_write_lock(env, obj, MOR_TGT_CHILD);
915 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
917 mdd_write_unlock(env, obj);
921 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
922 const struct lu_buf *buf, const char *name,
923 int fl, struct thandle *handle)
925 struct lustre_capa *capa = mdd_object_capa(env, obj);
929 if (buf->lb_buf && buf->lb_len > 0)
930 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
931 else if (buf->lb_buf == NULL && buf->lb_len == 0)
932 rc = mdo_xattr_del(env, obj, name, handle, capa);
938 * This gives the same functionality as the code between
939 * sys_chmod and inode_setattr
940 * chown_common and inode_setattr
941 * utimes and inode_setattr
942 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
944 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
945 struct lu_attr *la, const struct md_attr *ma)
947 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
948 struct md_ucred *uc = md_ucred(env);
955 /* Do not permit change file type */
956 if (la->la_valid & LA_TYPE)
959 /* They should not be processed by setattr */
960 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
963 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
967 if (la->la_valid == LA_CTIME) {
968 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
969 /* This is only for set ctime when rename's source is
971 rc = mdd_may_delete(env, NULL, obj,
972 (struct md_attr *)ma, 1, 0);
973 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
974 la->la_valid &= ~LA_CTIME;
978 if (la->la_valid == LA_ATIME) {
979 /* This is atime only set for read atime update on close. */
980 if (la->la_atime <= tmp_la->la_atime +
981 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
982 la->la_valid &= ~LA_ATIME;
986 /* Check if flags change. */
987 if (la->la_valid & LA_FLAGS) {
988 unsigned int oldflags = 0;
989 unsigned int newflags = la->la_flags &
990 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
992 if ((uc->mu_fsuid != tmp_la->la_uid) &&
993 !mdd_capable(uc, CFS_CAP_FOWNER))
996 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
997 * only be changed by the relevant capability. */
998 if (mdd_is_immutable(obj))
999 oldflags |= LUSTRE_IMMUTABLE_FL;
1000 if (mdd_is_append(obj))
1001 oldflags |= LUSTRE_APPEND_FL;
1002 if ((oldflags ^ newflags) &&
1003 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1006 if (!S_ISDIR(tmp_la->la_mode))
1007 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1010 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1011 (la->la_valid & ~LA_FLAGS) &&
1012 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1015 /* Check for setting the obj time. */
1016 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1017 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1018 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1019 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1020 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1028 /* Make sure a caller can chmod. */
1029 if (la->la_valid & LA_MODE) {
1030 /* Bypass la_vaild == LA_MODE,
1031 * this is for changing file with SUID or SGID. */
1032 if ((la->la_valid & ~LA_MODE) &&
1033 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1034 (uc->mu_fsuid != tmp_la->la_uid) &&
1035 !mdd_capable(uc, CFS_CAP_FOWNER))
1038 if (la->la_mode == (umode_t) -1)
1039 la->la_mode = tmp_la->la_mode;
1041 la->la_mode = (la->la_mode & S_IALLUGO) |
1042 (tmp_la->la_mode & ~S_IALLUGO);
1044 /* Also check the setgid bit! */
1045 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1046 la->la_gid : tmp_la->la_gid) &&
1047 !mdd_capable(uc, CFS_CAP_FSETID))
1048 la->la_mode &= ~S_ISGID;
1050 la->la_mode = tmp_la->la_mode;
1053 /* Make sure a caller can chown. */
1054 if (la->la_valid & LA_UID) {
1055 if (la->la_uid == (uid_t) -1)
1056 la->la_uid = tmp_la->la_uid;
1057 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1058 (la->la_uid != tmp_la->la_uid)) &&
1059 !mdd_capable(uc, CFS_CAP_CHOWN))
1062 /* If the user or group of a non-directory has been
1063 * changed by a non-root user, remove the setuid bit.
1064 * 19981026 David C Niemi <niemi@tux.org>
1066 * Changed this to apply to all users, including root,
1067 * to avoid some races. This is the behavior we had in
1068 * 2.0. The check for non-root was definitely wrong
1069 * for 2.2 anyway, as it should have been using
1070 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1071 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1072 !S_ISDIR(tmp_la->la_mode)) {
1073 la->la_mode &= ~S_ISUID;
1074 la->la_valid |= LA_MODE;
1078 /* Make sure caller can chgrp. */
1079 if (la->la_valid & LA_GID) {
1080 if (la->la_gid == (gid_t) -1)
1081 la->la_gid = tmp_la->la_gid;
1082 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1083 ((la->la_gid != tmp_la->la_gid) &&
1084 !lustre_in_group_p(uc, la->la_gid))) &&
1085 !mdd_capable(uc, CFS_CAP_CHOWN))
1088 /* Likewise, if the user or group of a non-directory
1089 * has been changed by a non-root user, remove the
1090 * setgid bit UNLESS there is no group execute bit
1091 * (this would be a file marked for mandatory
1092 * locking). 19981026 David C Niemi <niemi@tux.org>
1094 * Removed the fsuid check (see the comment above) --
1096 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1097 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1098 la->la_mode &= ~S_ISGID;
1099 la->la_valid |= LA_MODE;
1103 /* For both Size-on-MDS case and truncate case,
1104 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1105 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1106 * For SOM case, it is true, the MAY_WRITE perm has been checked
1107 * when open, no need check again. For truncate case, it is false,
1108 * the MAY_WRITE perm should be checked here. */
1109 if (ma->ma_attr_flags & MDS_SOM) {
1110 /* For the "Size-on-MDS" setattr update, merge coming
1111 * attributes with the set in the inode. BUG 10641 */
1112 if ((la->la_valid & LA_ATIME) &&
1113 (la->la_atime <= tmp_la->la_atime))
1114 la->la_valid &= ~LA_ATIME;
1116 /* OST attributes do not have a priority over MDS attributes,
1117 * so drop times if ctime is equal. */
1118 if ((la->la_valid & LA_CTIME) &&
1119 (la->la_ctime <= tmp_la->la_ctime))
1120 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1122 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1123 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1124 (uc->mu_fsuid == tmp_la->la_uid)) &&
1125 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1126 rc = mdd_permission_internal_locked(env, obj,
1133 if (la->la_valid & LA_CTIME) {
1134 /* The pure setattr, it has the priority over what is
1135 * already set, do not drop it if ctime is equal. */
1136 if (la->la_ctime < tmp_la->la_ctime)
1137 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1145 /** Store a data change changelog record
1146 * If this fails, we must fail the whole transaction; we don't
1147 * want the change to commit without the log entry.
1148 * \param mdd_obj - mdd_object of change
1149 * \param handle - transacion handle
1151 static int mdd_changelog_data_store(const struct lu_env *env,
1152 struct mdd_device *mdd,
1153 enum changelog_rec_type type,
1154 struct mdd_object *mdd_obj,
1155 struct thandle *handle)
1157 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1158 struct llog_changelog_rec *rec;
1163 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1166 LASSERT(handle != NULL);
1167 LASSERT(mdd_obj != NULL);
1169 if ((type == CL_SETATTR) &&
1170 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1171 /* Don't need multiple updates in this log */
1172 /* Don't check under lock - no big deal if we get an extra
1177 reclen = llog_data_len(sizeof(*rec));
1178 buf = mdd_buf_alloc(env, reclen);
1179 if (buf->lb_buf == NULL)
1181 rec = (struct llog_changelog_rec *)buf->lb_buf;
1183 rec->cr_flags = CLF_VERSION;
1184 rec->cr_type = (__u32)type;
1185 rec->cr_tfid = *tfid;
1186 rec->cr_namelen = 0;
1187 mdd_obj->mod_cltime = cfs_time_current_64();
1189 rc = mdd_changelog_llog_write(mdd, rec, handle);
1191 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1192 rc, type, PFID(tfid));
1199 /* set attr and LOV EA at once, return updated attr */
1200 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1201 const struct md_attr *ma)
1203 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1204 struct mdd_device *mdd = mdo2mdd(obj);
1205 struct thandle *handle;
1206 struct lov_mds_md *lmm = NULL;
1207 struct llog_cookie *logcookies = NULL;
1208 int rc, lmm_size = 0, cookie_size = 0;
1209 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1210 #ifdef HAVE_QUOTA_SUPPORT
1211 struct obd_device *obd = mdd->mdd_obd_dev;
1212 struct mds_obd *mds = &obd->u.mds;
1213 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1214 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1215 int quota_opc = 0, block_count = 0;
1216 int inode_pending = 0, block_pending = 0;
1220 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1221 MDD_TXN_ATTR_SET_OP);
1222 handle = mdd_trans_start(env, mdd);
1224 RETURN(PTR_ERR(handle));
1225 /*TODO: add lock here*/
1226 /* start a log jounal handle if needed */
1227 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1228 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1229 lmm_size = mdd_lov_mdsize(env, mdd);
1230 lmm = mdd_max_lmm_get(env, mdd);
1232 GOTO(cleanup, rc = -ENOMEM);
1234 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1241 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1242 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1243 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1245 *la_copy = ma->ma_attr;
1246 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1250 #ifdef HAVE_QUOTA_SUPPORT
1251 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1252 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1254 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1256 quota_opc = FSFILT_OP_SETATTR;
1257 mdd_quota_wrapper(la_copy, qnids);
1258 mdd_quota_wrapper(la_tmp, qoids);
1259 /* get file quota for new owner */
1260 lquota_chkquota(mds_quota_interface_ref, obd,
1261 qnids[USRQUOTA], qnids[GRPQUOTA], 1,
1262 &inode_pending, NULL, 0, NULL, 0);
1263 block_count = (la_tmp->la_blocks + 7) >> 3;
1266 mdd_data_get(env, mdd_obj, &data);
1267 /* get block quota for new owner */
1268 lquota_chkquota(mds_quota_interface_ref, obd,
1271 block_count, &block_pending,
1272 NULL, LQUOTA_FLAGS_BLK,
1279 if (la_copy->la_valid & LA_FLAGS) {
1280 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1283 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1284 } else if (la_copy->la_valid) { /* setattr */
1285 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1287 /* journal chown/chgrp in llog, just like unlink */
1288 if (rc == 0 && lmm_size){
1289 cookie_size = mdd_lov_cookiesize(env, mdd);
1290 logcookies = mdd_max_cookie_get(env, mdd);
1291 if (logcookies == NULL)
1292 GOTO(cleanup, rc = -ENOMEM);
1294 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1295 logcookies, cookie_size) <= 0)
1300 if (rc == 0 && ma->ma_valid & MA_LOV) {
1303 mode = mdd_object_type(mdd_obj);
1304 if (S_ISREG(mode) || S_ISDIR(mode)) {
1305 rc = mdd_lsm_sanity_check(env, mdd_obj);
1309 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1310 ma->ma_lmm_size, handle, 1);
1315 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1316 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1318 mdd_trans_stop(env, mdd, rc, handle);
1319 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1320 /*set obd attr, if needed*/
1321 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1324 #ifdef HAVE_QUOTA_SUPPORT
1327 lquota_pending_commit(mds_quota_interface_ref, obd,
1328 qnids[USRQUOTA], qnids[GRPQUOTA],
1331 lquota_pending_commit(mds_quota_interface_ref, obd,
1332 qnids[USRQUOTA], qnids[GRPQUOTA],
1334 /* Trigger dqrel/dqacq for original owner and new owner.
1335 * If failed, the next call for lquota_chkquota will
1337 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1344 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1345 const struct lu_buf *buf, const char *name, int fl,
1346 struct thandle *handle)
1351 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1352 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1353 mdd_write_unlock(env, obj);
1358 static int mdd_xattr_sanity_check(const struct lu_env *env,
1359 struct mdd_object *obj)
1361 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1362 struct md_ucred *uc = md_ucred(env);
1366 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1369 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1373 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1374 !mdd_capable(uc, CFS_CAP_FOWNER))
1381 * The caller should guarantee to update the object ctime
1382 * after xattr_set if needed.
1384 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1385 const struct lu_buf *buf, const char *name,
1388 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1389 struct mdd_device *mdd = mdo2mdd(obj);
1390 struct thandle *handle;
1394 rc = mdd_xattr_sanity_check(env, mdd_obj);
1398 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1399 handle = mdd_trans_start(env, mdd);
1401 RETURN(PTR_ERR(handle));
1403 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1405 /* Only record user xattr changes */
1406 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1407 (strncmp("user.", name, 5) == 0))
1408 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1410 mdd_trans_stop(env, mdd, rc, handle);
1416 * The caller should guarantee to update the object ctime
1417 * after xattr_set if needed.
1419 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1422 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1423 struct mdd_device *mdd = mdo2mdd(obj);
1424 struct thandle *handle;
1428 rc = mdd_xattr_sanity_check(env, mdd_obj);
1432 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1433 handle = mdd_trans_start(env, mdd);
1435 RETURN(PTR_ERR(handle));
1437 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1438 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1439 mdd_object_capa(env, mdd_obj));
1440 mdd_write_unlock(env, mdd_obj);
1442 /* Only record user xattr changes */
1443 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1444 (strncmp("user.", name, 5) != 0))
1445 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1448 mdd_trans_stop(env, mdd, rc, handle);
1453 /* partial unlink */
1454 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1457 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1458 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1459 struct mdd_device *mdd = mdo2mdd(obj);
1460 struct thandle *handle;
1461 #ifdef HAVE_QUOTA_SUPPORT
1462 struct obd_device *obd = mdd->mdd_obd_dev;
1463 struct mds_obd *mds = &obd->u.mds;
1464 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1471 * Check -ENOENT early here because we need to get object type
1472 * to calculate credits before transaction start
1474 if (!mdd_object_exists(mdd_obj))
1477 LASSERT(mdd_object_exists(mdd_obj) > 0);
1479 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1483 handle = mdd_trans_start(env, mdd);
1487 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1489 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1493 __mdd_ref_del(env, mdd_obj, handle, 0);
1495 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1497 __mdd_ref_del(env, mdd_obj, handle, 1);
1500 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1501 la_copy->la_ctime = ma->ma_attr.la_ctime;
1503 la_copy->la_valid = LA_CTIME;
1504 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1508 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1509 #ifdef HAVE_QUOTA_SUPPORT
1510 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1511 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1512 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1513 mdd_quota_wrapper(&ma->ma_attr, qids);
1520 mdd_write_unlock(env, mdd_obj);
1521 mdd_trans_stop(env, mdd, rc, handle);
1522 #ifdef HAVE_QUOTA_SUPPORT
1524 /* Trigger dqrel on the owner of child. If failed,
1525 * the next call for lquota_chkquota will process it */
1526 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1532 /* partial operation */
1533 static int mdd_oc_sanity_check(const struct lu_env *env,
1534 struct mdd_object *obj,
1540 switch (ma->ma_attr.la_mode & S_IFMT) {
1557 static int mdd_object_create(const struct lu_env *env,
1558 struct md_object *obj,
1559 const struct md_op_spec *spec,
1563 struct mdd_device *mdd = mdo2mdd(obj);
1564 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1565 const struct lu_fid *pfid = spec->u.sp_pfid;
1566 struct thandle *handle;
1567 #ifdef HAVE_QUOTA_SUPPORT
1568 struct obd_device *obd = mdd->mdd_obd_dev;
1569 struct mds_obd *mds = &obd->u.mds;
1570 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1571 int quota_opc = 0, block_count = 0;
1572 int inode_pending = 0, block_pending = 0;
1577 #ifdef HAVE_QUOTA_SUPPORT
1578 if (mds->mds_quota) {
1579 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1580 mdd_quota_wrapper(&ma->ma_attr, qids);
1581 /* get file quota for child */
1582 lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
1583 qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
1585 switch (ma->ma_attr.la_mode & S_IFMT) {
1594 /* get block quota for child */
1596 lquota_chkquota(mds_quota_interface_ref, obd,
1597 qids[USRQUOTA], qids[GRPQUOTA],
1598 block_count, &block_pending, NULL,
1599 LQUOTA_FLAGS_BLK, NULL, 0);
1603 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1604 handle = mdd_trans_start(env, mdd);
1606 GOTO(out_pending, rc = PTR_ERR(handle));
1608 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1609 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1613 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1617 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1618 /* If creating the slave object, set slave EA here. */
1619 int lmv_size = spec->u.sp_ea.eadatalen;
1620 struct lmv_stripe_md *lmv;
1622 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1623 LASSERT(lmv != NULL && lmv_size > 0);
1625 rc = __mdd_xattr_set(env, mdd_obj,
1626 mdd_buf_get_const(env, lmv, lmv_size),
1627 XATTR_NAME_LMV, 0, handle);
1631 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1634 #ifdef CONFIG_FS_POSIX_ACL
1635 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1636 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1638 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1639 buf->lb_len = spec->u.sp_ea.eadatalen;
1640 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1641 rc = __mdd_acl_init(env, mdd_obj, buf,
1642 &ma->ma_attr.la_mode,
1647 ma->ma_attr.la_valid |= LA_MODE;
1650 pfid = spec->u.sp_ea.fid;
1653 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1659 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1660 mdd_write_unlock(env, mdd_obj);
1662 mdd_trans_stop(env, mdd, rc, handle);
1664 #ifdef HAVE_QUOTA_SUPPORT
1667 lquota_pending_commit(mds_quota_interface_ref, obd,
1668 qids[USRQUOTA], qids[GRPQUOTA],
1671 lquota_pending_commit(mds_quota_interface_ref, obd,
1672 qids[USRQUOTA], qids[GRPQUOTA],
1674 /* Trigger dqacq on the owner of child. If failed,
1675 * the next call for lquota_chkquota will process it. */
1676 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1677 FSFILT_OP_CREATE_PARTIAL_CHILD);
1684 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1685 const struct md_attr *ma)
1687 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1688 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1689 struct mdd_device *mdd = mdo2mdd(obj);
1690 struct thandle *handle;
1694 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1695 handle = mdd_trans_start(env, mdd);
1699 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1700 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1702 __mdd_ref_add(env, mdd_obj, handle);
1703 mdd_write_unlock(env, mdd_obj);
1705 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1706 la_copy->la_ctime = ma->ma_attr.la_ctime;
1708 la_copy->la_valid = LA_CTIME;
1709 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1712 mdd_trans_stop(env, mdd, 0, handle);
1718 * do NOT or the MAY_*'s, you'll get the weakest
1720 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1724 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1725 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1726 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1727 * owner can write to a file even if it is marked readonly to hide
1728 * its brokenness. (bug 5781) */
1729 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1730 struct md_ucred *uc = md_ucred(env);
1732 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1733 (la->la_uid == uc->mu_fsuid))
1737 if (flags & FMODE_READ)
1739 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1741 if (flags & MDS_FMODE_EXEC)
1746 static int mdd_open_sanity_check(const struct lu_env *env,
1747 struct mdd_object *obj, int flag)
1749 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1754 if (mdd_is_dead_obj(obj))
1757 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1761 if (S_ISLNK(tmp_la->la_mode))
1764 mode = accmode(env, tmp_la, flag);
1766 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1769 if (!(flag & MDS_OPEN_CREATED)) {
1770 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1775 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1776 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1777 flag &= ~MDS_OPEN_TRUNC;
1779 /* For writing append-only file must open it with append mode. */
1780 if (mdd_is_append(obj)) {
1781 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1783 if (flag & MDS_OPEN_TRUNC)
1789 * Now, flag -- O_NOATIME does not be packed by client.
1791 if (flag & O_NOATIME) {
1792 struct md_ucred *uc = md_ucred(env);
1794 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1795 (uc->mu_valid == UCRED_NEW)) &&
1796 (uc->mu_fsuid != tmp_la->la_uid) &&
1797 !mdd_capable(uc, CFS_CAP_FOWNER))
1805 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1808 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1811 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1813 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1815 mdd_obj->mod_count++;
1817 mdd_write_unlock(env, mdd_obj);
1821 /* return md_attr back,
1822 * if it is last unlink then return lov ea + llog cookie*/
1823 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1829 if (S_ISREG(mdd_object_type(obj))) {
1830 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1831 * Caller must be ready for that. */
1833 rc = __mdd_lmm_get(env, obj, ma);
1834 if ((ma->ma_valid & MA_LOV))
1835 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1842 * No permission check is needed.
1844 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1847 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1848 struct thandle *handle;
1852 #ifdef HAVE_QUOTA_SUPPORT
1853 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1854 struct mds_obd *mds = &obd->u.mds;
1855 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1860 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1863 handle = mdd_trans_start(env, mdo2mdd(obj));
1865 RETURN(PTR_ERR(handle));
1867 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1868 /* release open count */
1869 mdd_obj->mod_count --;
1871 if (mdd_obj->mod_count == 0) {
1872 /* remove link to object from orphan index */
1873 if (mdd_obj->mod_flags & ORPHAN_OBJ)
1874 __mdd_orphan_del(env, mdd_obj, handle);
1877 rc = mdd_iattr_get(env, mdd_obj, ma);
1879 if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
1880 rc = mdd_object_kill(env, mdd_obj, ma);
1881 #ifdef HAVE_QUOTA_SUPPORT
1882 if (mds->mds_quota) {
1883 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1884 mdd_quota_wrapper(&ma->ma_attr, qids);
1893 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1895 mdd_write_unlock(env, mdd_obj);
1896 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1897 #ifdef HAVE_QUOTA_SUPPORT
1899 /* Trigger dqrel on the owner of child. If failed,
1900 * the next call for lquota_chkquota will process it */
1901 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1908 * Permission check is done when open,
1909 * no need check again.
1911 static int mdd_readpage_sanity_check(const struct lu_env *env,
1912 struct mdd_object *obj)
1914 struct dt_object *next = mdd_object_child(obj);
1918 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1926 static int mdd_append_attrs(const struct lu_env *env,
1927 struct mdd_device *mdd,
1929 const struct dt_it_ops *iops,
1931 struct lu_dirent*ent)
1933 struct mdd_thread_info *info = mdd_env_info(env);
1934 struct lu_fid *fid = &info->mti_fid2;
1935 int len = cpu_to_le16(ent->lde_namelen);
1936 const unsigned align = sizeof(struct luda_type) - 1;
1937 struct lu_fid_pack *pack;
1938 struct mdd_object *obj;
1939 struct luda_type *lt;
1942 if (attr & LUDA_FID) {
1943 pack = (struct lu_fid_pack *)iops->rec(env, it);
1949 rc = fid_unpack(pack, fid);
1955 fid_cpu_to_le(&ent->lde_fid, fid);
1956 ent->lde_attrs = LUDA_FID;
1959 /* check if file type is required */
1960 if (attr & LUDA_TYPE) {
1961 if (!(attr & LUDA_FID)) {
1962 CERROR("wrong attr : [%x]\n",attr);
1967 obj = mdd_object_find(env, mdd, fid);
1968 if (obj == NULL) /* remote object */
1976 if (mdd_object_exists(obj) == +1) {
1977 len = (len + align) & ~align;
1979 lt = (void *) ent->lde_name + len;
1980 lt->lt_type = cpu_to_le16(mdd_object_type(obj));
1982 ent->lde_attrs |= LUDA_TYPE;
1984 mdd_object_put(env, obj);
1987 ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
1991 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
1992 int first, void *area, int nob,
1993 const struct dt_it_ops *iops, struct dt_it *it,
1994 __u64 *start, __u64 *end,
1995 struct lu_dirent **last, __u32 attr)
1998 struct lu_dirent *ent;
2002 memset(area, 0, sizeof (struct lu_dirpage));
2003 area += sizeof (struct lu_dirpage);
2004 nob -= sizeof (struct lu_dirpage);
2013 len = iops->key_size(env, it);
2015 /* IAM iterator can return record with zero len. */
2019 name = (char *)iops->key(env, it);
2020 hash = iops->store(env, it);
2022 if (unlikely(first)) {
2027 recsize = lu_dirent_calc_size(len, attr);
2029 CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n",
2030 name, ent, nob, hash, len, len, len, name);
2032 if (nob >= recsize) {
2033 ent->lde_hash = cpu_to_le64(hash);
2034 ent->lde_namelen = cpu_to_le16(len);
2035 ent->lde_reclen = cpu_to_le16(recsize);
2036 memcpy(ent->lde_name, name, len);
2038 result = mdd_append_attrs(env, mdd, attr, iops, it, ent);
2043 * record doesn't fit into page, enlarge previous one.
2046 (*last)->lde_reclen =
2047 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2056 ent = (void *)ent + recsize;
2060 result = iops->next(env, it);
2061 } while (result == 0);
2068 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2069 const struct lu_rdpg *rdpg)
2072 struct dt_object *next = mdd_object_child(obj);
2073 const struct dt_it_ops *iops;
2075 struct lu_dirent *last = NULL;
2076 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2083 LASSERT(rdpg->rp_pages != NULL);
2084 LASSERT(next->do_index_ops != NULL);
2086 if (rdpg->rp_count <= 0)
2090 * iterate through directory and fill pages from @rdpg
2092 iops = &next->do_index_ops->dio_it;
2093 it = iops->init(env, next, mdd_object_capa(env, obj));
2097 rc = iops->load(env, it, rdpg->rp_hash);
2101 * Iterator didn't find record with exactly the key requested.
2103 * It is currently either
2105 * - positioned above record with key less than
2106 * requested---skip it.
2108 * - or not positioned at all (is in IAM_IT_SKEWED
2109 * state)---position it on the next item.
2111 rc = iops->next(env, it);
2116 * At this point and across for-loop:
2118 * rc == 0 -> ok, proceed.
2119 * rc > 0 -> end of directory.
2122 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2123 i++, nob -= CFS_PAGE_SIZE) {
2124 LASSERT(i < rdpg->rp_npages);
2125 pg = rdpg->rp_pages[i];
2126 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2127 min_t(int, nob, CFS_PAGE_SIZE), iops,
2128 it, &hash_start, &hash_end, &last,
2130 if (rc != 0 || i == rdpg->rp_npages - 1) {
2132 last->lde_reclen = 0;
2140 hash_end = DIR_END_OFF;
2144 struct lu_dirpage *dp;
2146 dp = cfs_kmap(rdpg->rp_pages[0]);
2147 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2148 dp->ldp_hash_end = cpu_to_le64(hash_end);
2151 * No pages were processed, mark this.
2153 dp->ldp_flags |= LDF_EMPTY;
2155 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2156 cfs_kunmap(rdpg->rp_pages[0]);
2159 iops->fini(env, it);
2164 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2165 const struct lu_rdpg *rdpg)
2167 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2171 LASSERT(mdd_object_exists(mdd_obj));
2173 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2174 rc = mdd_readpage_sanity_check(env, mdd_obj);
2176 GOTO(out_unlock, rc);
2178 if (mdd_is_dead_obj(mdd_obj)) {
2180 struct lu_dirpage *dp;
2183 * According to POSIX, please do not return any entry to client:
2184 * even dot and dotdot should not be returned.
2186 CWARN("readdir from dead object: "DFID"\n",
2187 PFID(mdd_object_fid(mdd_obj)));
2189 if (rdpg->rp_count <= 0)
2190 GOTO(out_unlock, rc = -EFAULT);
2191 LASSERT(rdpg->rp_pages != NULL);
2193 pg = rdpg->rp_pages[0];
2194 dp = (struct lu_dirpage*)cfs_kmap(pg);
2195 memset(dp, 0 , sizeof(struct lu_dirpage));
2196 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2197 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2198 dp->ldp_flags |= LDF_EMPTY;
2199 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2201 GOTO(out_unlock, rc = 0);
2204 rc = __mdd_readpage(env, mdd_obj, rdpg);
2208 mdd_read_unlock(env, mdd_obj);
2212 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2214 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2215 struct dt_object *next;
2217 LASSERT(mdd_object_exists(mdd_obj));
2218 next = mdd_object_child(mdd_obj);
2219 return next->do_ops->do_object_sync(env, next);
2222 const struct md_object_operations mdd_obj_ops = {
2223 .moo_permission = mdd_permission,
2224 .moo_attr_get = mdd_attr_get,
2225 .moo_attr_set = mdd_attr_set,
2226 .moo_xattr_get = mdd_xattr_get,
2227 .moo_xattr_set = mdd_xattr_set,
2228 .moo_xattr_list = mdd_xattr_list,
2229 .moo_xattr_del = mdd_xattr_del,
2230 .moo_object_create = mdd_object_create,
2231 .moo_ref_add = mdd_ref_add,
2232 .moo_ref_del = mdd_ref_del,
2233 .moo_open = mdd_open,
2234 .moo_close = mdd_close,
2235 .moo_readpage = mdd_readpage,
2236 .moo_readlink = mdd_readlink,
2237 .moo_capa_get = mdd_capa_get,
2238 .moo_object_sync = mdd_object_sync,
2239 .moo_path = mdd_path,