1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
52 #include <linux/jbd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
65 #include <linux/ldiskfs_fs.h>
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
70 #include "mdd_internal.h"
72 static const struct lu_object_operations mdd_lu_obj_ops;
74 static int mdd_xattr_get(const struct lu_env *env,
75 struct md_object *obj, struct lu_buf *buf,
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
81 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82 PFID(mdd_object_fid(obj)));
83 mdo_data_get(env, obj, data);
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88 struct lu_attr *la, struct lustre_capa *capa)
90 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91 PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
130 OBD_VFREE(buf->lb_buf, buf->lb_len);
132 OBD_FREE(buf->lb_buf, buf->lb_len);
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137 const void *area, ssize_t len)
141 buf = &mdd_env_info(env)->mti_buf;
142 buf->lb_buf = (void *)area;
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154 OBD_VFREE(buf->lb_buf, buf->lb_len);
156 OBD_FREE(buf->lb_buf, buf->lb_len);
159 if (buf->lb_buf == NULL) {
161 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162 OBD_ALLOC(buf->lb_buf, buf->lb_len);
165 if (buf->lb_buf == NULL) {
166 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
169 if (buf->lb_buf == NULL)
175 /** Increase the size of the \a mti_big_buf.
176 * preserves old data in buffer
177 * old buffer remains unchanged on error
178 * \retval 0 or -ENOMEM
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
185 LASSERT(len >= oldbuf->lb_len);
186 if (len > BUF_VMALLOC_SIZE) {
187 OBD_VMALLOC(buf.lb_buf, len);
190 OBD_ALLOC(buf.lb_buf, len);
193 if (buf.lb_buf == NULL)
197 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199 if (oldbuf->lb_vmalloc)
200 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204 memcpy(oldbuf, &buf, sizeof(buf));
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210 struct mdd_device *mdd)
212 struct mdd_thread_info *mti = mdd_env_info(env);
215 max_cookie_size = mdd_lov_cookiesize(env, mdd);
216 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217 if (mti->mti_max_cookie)
218 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219 mti->mti_max_cookie = NULL;
220 mti->mti_max_cookie_size = 0;
222 if (unlikely(mti->mti_max_cookie == NULL)) {
223 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224 if (likely(mti->mti_max_cookie != NULL))
225 mti->mti_max_cookie_size = max_cookie_size;
227 if (likely(mti->mti_max_cookie != NULL))
228 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229 return mti->mti_max_cookie;
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233 struct mdd_device *mdd)
235 struct mdd_thread_info *mti = mdd_env_info(env);
238 max_lmm_size = mdd_lov_mdsize(env, mdd);
239 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240 if (mti->mti_max_lmm)
241 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242 mti->mti_max_lmm = NULL;
243 mti->mti_max_lmm_size = 0;
245 if (unlikely(mti->mti_max_lmm == NULL)) {
246 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247 if (unlikely(mti->mti_max_lmm != NULL))
248 mti->mti_max_lmm_size = max_lmm_size;
250 return mti->mti_max_lmm;
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254 const struct lu_object_header *hdr,
257 struct mdd_object *mdd_obj;
259 OBD_ALLOC_PTR(mdd_obj);
260 if (mdd_obj != NULL) {
263 o = mdd2lu_obj(mdd_obj);
264 lu_object_init(o, NULL, d);
265 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267 mdd_obj->mod_count = 0;
268 o->lo_ops = &mdd_lu_obj_ops;
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276 const struct lu_object_conf *unused)
278 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279 struct mdd_object *mdd_obj = lu2mdd_obj(o);
280 struct lu_object *below;
281 struct lu_device *under;
284 mdd_obj->mod_cltime = 0;
285 under = &d->mdd_child->dd_lu_dev;
286 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287 mdd_pdlock_init(mdd_obj);
291 lu_object_add(o, below);
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 if (lu_object_exists(o))
299 return mdd_get_flags(env, lu2mdd_obj(o));
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 struct mdd_object *mdd = lu2mdd_obj(o);
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313 lu_printer_t p, const struct lu_object *o)
315 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317 "valid=%x, cltime=%llu, flags=%lx)",
318 mdd, mdd->mod_count, mdd->mod_valid,
319 mdd->mod_cltime, mdd->mod_flags);
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323 .loo_object_init = mdd_object_init,
324 .loo_object_start = mdd_object_start,
325 .loo_object_free = mdd_object_free,
326 .loo_object_print = mdd_object_print,
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330 struct mdd_device *d,
331 const struct lu_fid *f)
333 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337 const char *path, struct lu_fid *fid)
340 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341 struct mdd_object *obj;
342 struct lu_name *lname = &mdd_env_info(env)->mti_name;
347 /* temp buffer for path element */
348 buf = mdd_buf_alloc(env, PATH_MAX);
349 if (buf->lb_buf == NULL)
352 lname->ln_name = name = buf->lb_buf;
353 lname->ln_namelen = 0;
354 *f = mdd->mdd_root_fid;
361 while (*path != '/' && *path != '\0') {
369 /* find obj corresponding to fid */
370 obj = mdd_object_find(env, mdd, f);
372 GOTO(out, rc = -EREMOTE);
374 GOTO(out, rc = -PTR_ERR(obj));
375 /* get child fid from parent and name */
376 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377 mdd_object_put(env, obj);
382 lname->ln_namelen = 0;
391 /** The maximum depth that fid2path() will search.
392 * This is limited only because we want to store the fids for
393 * historical path lookup purposes.
395 #define MAX_PATH_DEPTH 100
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399 __u64 pli_recno; /**< history point */
400 __u64 pli_currec; /**< current record */
401 struct lu_fid pli_fid;
402 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403 struct mdd_object *pli_mdd_obj;
404 char *pli_path; /**< full path */
406 int pli_linkno; /**< which hardlink to follow */
407 int pli_fidcount; /**< number of \a pli_fids */
410 static int mdd_path_current(const struct lu_env *env,
411 struct path_lookup_info *pli)
413 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414 struct mdd_object *mdd_obj;
415 struct lu_buf *buf = NULL;
416 struct link_ea_header *leh;
417 struct link_ea_entry *lee;
418 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
425 ptr = pli->pli_path + pli->pli_pathlen - 1;
428 pli->pli_fidcount = 0;
429 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432 mdd_obj = mdd_object_find(env, mdd,
433 &pli->pli_fids[pli->pli_fidcount]);
435 GOTO(out, rc = -EREMOTE);
437 GOTO(out, rc = -PTR_ERR(mdd_obj));
438 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440 mdd_object_put(env, mdd_obj);
444 /* Do I need to error out here? */
449 /* Get parent fid and object name */
450 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451 buf = mdd_links_get(env, mdd_obj);
452 mdd_read_unlock(env, mdd_obj);
453 mdd_object_put(env, mdd_obj);
455 GOTO(out, rc = PTR_ERR(buf));
458 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461 /* If set, use link #linkno for path lookup, otherwise use
462 link #0. Only do this for the final path element. */
463 if ((pli->pli_fidcount == 0) &&
464 (pli->pli_linkno < leh->leh_reccount)) {
466 for (count = 0; count < pli->pli_linkno; count++) {
467 lee = (struct link_ea_entry *)
468 ((char *)lee + reclen);
469 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471 if (pli->pli_linkno < leh->leh_reccount - 1)
472 /* indicate to user there are more links */
476 /* Pack the name in the end of the buffer */
477 ptr -= tmpname->ln_namelen;
478 if (ptr - 1 <= pli->pli_path)
479 GOTO(out, rc = -EOVERFLOW);
480 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
483 /* Store the parent fid for historic lookup */
484 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485 GOTO(out, rc = -EOVERFLOW);
486 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
489 /* Verify that our path hasn't changed since we started the lookup.
490 Record the current index, and verify the path resolves to the
491 same fid. If it does, then the path is correct as of this index. */
492 spin_lock(&mdd->mdd_cl.mc_lock);
493 pli->pli_currec = mdd->mdd_cl.mc_index;
494 spin_unlock(&mdd->mdd_cl.mc_lock);
495 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498 GOTO (out, rc = -EAGAIN);
500 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503 PFID(&pli->pli_fid));
504 GOTO(out, rc = -EAGAIN);
507 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
511 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512 /* if we vmalloced a large buffer drop it */
518 static int mdd_path_historic(const struct lu_env *env,
519 struct path_lookup_info *pli)
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526 char *path, int pathlen, __u64 *recno, int *linkno)
528 struct path_lookup_info *pli;
536 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
546 pli->pli_mdd_obj = md2mdd_obj(obj);
547 pli->pli_recno = *recno;
548 pli->pli_path = path;
549 pli->pli_pathlen = pathlen;
550 pli->pli_linkno = *linkno;
552 /* Retry multiple times in case file is being moved */
553 while (tries-- && rc == -EAGAIN)
554 rc = mdd_path_current(env, pli);
556 /* For historical path lookup, the current links may not have existed
557 * at "recno" time. We must switch over to earlier links/parents
558 * by using the changelog records. If the earlier parent doesn't
559 * exist, we must search back through the changelog to reconstruct
560 * its parents, then check if it exists, etc.
561 * We may ignore this problem for the initial implementation and
562 * state that an "original" hardlink must still exist for us to find
563 * historic path name. */
564 if (pli->pli_recno != -1) {
565 rc = mdd_path_historic(env, pli);
567 *recno = pli->pli_currec;
568 /* Return next link index to caller */
569 *linkno = pli->pli_linkno;
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
579 struct lu_attr *la = &mdd_env_info(env)->mti_la;
583 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
585 mdd_flags_xlate(obj, la->la_flags);
586 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587 obj->mod_flags |= MNLINK_OBJ;
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
599 if (ma->ma_valid & MA_INODE)
602 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603 mdd_object_capa(env, mdd_obj));
605 ma->ma_valid |= MA_INODE;
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
612 struct lov_desc *ldesc;
613 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
616 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617 LASSERT(ldesc != NULL);
622 lmm->lmm_magic = LOV_MAGIC_V1;
623 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624 lmm->lmm_pattern = ldesc->ld_pattern;
625 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627 *size = sizeof(struct lov_mds_md);
629 RETURN(sizeof(struct lov_mds_md));
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634 struct mdd_object *mdd_obj, struct md_attr *ma)
639 if (ma->ma_valid & MA_LOV)
642 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
645 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
651 ma->ma_valid |= MA_LOV;
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
663 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664 rc = __mdd_lmm_get(env, mdd_obj, ma);
665 mdd_read_unlock(env, mdd_obj);
670 static int __mdd_lmv_get(const struct lu_env *env,
671 struct mdd_object *mdd_obj, struct md_attr *ma)
676 if (ma->ma_valid & MA_LMV)
679 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
682 ma->ma_valid |= MA_LMV;
688 static int mdd_attr_get_internal(const struct lu_env *env,
689 struct mdd_object *mdd_obj,
695 if (ma->ma_need & MA_INODE)
696 rc = mdd_iattr_get(env, mdd_obj, ma);
698 if (rc == 0 && ma->ma_need & MA_LOV) {
699 if (S_ISREG(mdd_object_type(mdd_obj)) ||
700 S_ISDIR(mdd_object_type(mdd_obj)))
701 rc = __mdd_lmm_get(env, mdd_obj, ma);
703 if (rc == 0 && ma->ma_need & MA_LMV) {
704 if (S_ISDIR(mdd_object_type(mdd_obj)))
705 rc = __mdd_lmv_get(env, mdd_obj, ma);
707 #ifdef CONFIG_FS_POSIX_ACL
708 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
709 if (S_ISDIR(mdd_object_type(mdd_obj)))
710 rc = mdd_def_acl_get(env, mdd_obj, ma);
713 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
718 int mdd_attr_get_internal_locked(const struct lu_env *env,
719 struct mdd_object *mdd_obj, struct md_attr *ma)
722 int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
725 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
726 rc = mdd_attr_get_internal(env, mdd_obj, ma);
728 mdd_read_unlock(env, mdd_obj);
733 * No permission check is needed.
735 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
738 struct mdd_object *mdd_obj = md2mdd_obj(obj);
742 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
747 * No permission check is needed.
749 static int mdd_xattr_get(const struct lu_env *env,
750 struct md_object *obj, struct lu_buf *buf,
753 struct mdd_object *mdd_obj = md2mdd_obj(obj);
758 LASSERT(mdd_object_exists(mdd_obj));
760 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
761 rc = mdo_xattr_get(env, mdd_obj, buf, name,
762 mdd_object_capa(env, mdd_obj));
763 mdd_read_unlock(env, mdd_obj);
769 * Permission check is done when open,
770 * no need check again.
772 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
775 struct mdd_object *mdd_obj = md2mdd_obj(obj);
776 struct dt_object *next;
781 LASSERT(mdd_object_exists(mdd_obj));
783 next = mdd_object_child(mdd_obj);
784 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
785 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
786 mdd_object_capa(env, mdd_obj));
787 mdd_read_unlock(env, mdd_obj);
792 * No permission check is needed.
794 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
797 struct mdd_object *mdd_obj = md2mdd_obj(obj);
802 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
803 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
804 mdd_read_unlock(env, mdd_obj);
809 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
810 struct mdd_object *c, struct md_attr *ma,
811 struct thandle *handle,
812 const struct md_op_spec *spec)
814 struct lu_attr *attr = &ma->ma_attr;
815 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
816 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
817 const struct dt_index_features *feat = spec->sp_feat;
821 if (!mdd_object_exists(c)) {
822 struct dt_object *next = mdd_object_child(c);
825 if (feat != &dt_directory_features && feat != NULL)
826 dof->dof_type = DFT_INDEX;
828 dof->dof_type = dt_mode_to_dft(attr->la_mode);
830 dof->u.dof_idx.di_feat = feat;
832 /* @hint will be initialized by underlying device. */
833 next->do_ops->do_ah_init(env, hint,
834 p ? mdd_object_child(p) : NULL,
835 attr->la_mode & S_IFMT);
837 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
838 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
846 * Make sure the ctime is increased only.
848 static inline int mdd_attr_check(const struct lu_env *env,
849 struct mdd_object *obj,
850 struct lu_attr *attr)
852 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
856 if (attr->la_valid & LA_CTIME) {
857 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
861 if (attr->la_ctime < tmp_la->la_ctime)
862 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
863 else if (attr->la_valid == LA_CTIME &&
864 attr->la_ctime == tmp_la->la_ctime)
865 attr->la_valid &= ~LA_CTIME;
870 int mdd_attr_set_internal(const struct lu_env *env,
871 struct mdd_object *obj,
872 struct lu_attr *attr,
873 struct thandle *handle,
879 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
880 #ifdef CONFIG_FS_POSIX_ACL
881 if (!rc && (attr->la_valid & LA_MODE) && needacl)
882 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
887 int mdd_attr_check_set_internal(const struct lu_env *env,
888 struct mdd_object *obj,
889 struct lu_attr *attr,
890 struct thandle *handle,
896 rc = mdd_attr_check(env, obj, attr);
901 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
905 static int mdd_attr_set_internal_locked(const struct lu_env *env,
906 struct mdd_object *obj,
907 struct lu_attr *attr,
908 struct thandle *handle,
914 needacl = needacl && (attr->la_valid & LA_MODE);
916 mdd_write_lock(env, obj, MOR_TGT_CHILD);
917 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
919 mdd_write_unlock(env, obj);
923 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
924 struct mdd_object *obj,
925 struct lu_attr *attr,
926 struct thandle *handle,
932 needacl = needacl && (attr->la_valid & LA_MODE);
934 mdd_write_lock(env, obj, MOR_TGT_CHILD);
935 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
937 mdd_write_unlock(env, obj);
941 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
942 const struct lu_buf *buf, const char *name,
943 int fl, struct thandle *handle)
945 struct lustre_capa *capa = mdd_object_capa(env, obj);
949 if (buf->lb_buf && buf->lb_len > 0)
950 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
951 else if (buf->lb_buf == NULL && buf->lb_len == 0)
952 rc = mdo_xattr_del(env, obj, name, handle, capa);
958 * This gives the same functionality as the code between
959 * sys_chmod and inode_setattr
960 * chown_common and inode_setattr
961 * utimes and inode_setattr
962 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
964 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
965 struct lu_attr *la, const struct md_attr *ma)
967 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
968 struct md_ucred *uc = md_ucred(env);
975 /* Do not permit change file type */
976 if (la->la_valid & LA_TYPE)
979 /* They should not be processed by setattr */
980 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
983 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
987 if (la->la_valid == LA_CTIME) {
988 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
989 /* This is only for set ctime when rename's source is
991 rc = mdd_may_delete(env, NULL, obj,
992 (struct md_attr *)ma, 1, 0);
993 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
994 la->la_valid &= ~LA_CTIME;
998 if (la->la_valid == LA_ATIME) {
999 /* This is atime only set for read atime update on close. */
1000 if (la->la_atime <= tmp_la->la_atime +
1001 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1002 la->la_valid &= ~LA_ATIME;
1006 /* Check if flags change. */
1007 if (la->la_valid & LA_FLAGS) {
1008 unsigned int oldflags = 0;
1009 unsigned int newflags = la->la_flags &
1010 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1012 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1013 !mdd_capable(uc, CFS_CAP_FOWNER))
1016 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1017 * only be changed by the relevant capability. */
1018 if (mdd_is_immutable(obj))
1019 oldflags |= LUSTRE_IMMUTABLE_FL;
1020 if (mdd_is_append(obj))
1021 oldflags |= LUSTRE_APPEND_FL;
1022 if ((oldflags ^ newflags) &&
1023 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1026 if (!S_ISDIR(tmp_la->la_mode))
1027 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1030 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1031 (la->la_valid & ~LA_FLAGS) &&
1032 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1035 /* Check for setting the obj time. */
1036 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1037 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1038 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1039 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1040 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1048 /* Make sure a caller can chmod. */
1049 if (la->la_valid & LA_MODE) {
1050 /* Bypass la_vaild == LA_MODE,
1051 * this is for changing file with SUID or SGID. */
1052 if ((la->la_valid & ~LA_MODE) &&
1053 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1054 (uc->mu_fsuid != tmp_la->la_uid) &&
1055 !mdd_capable(uc, CFS_CAP_FOWNER))
1058 if (la->la_mode == (umode_t) -1)
1059 la->la_mode = tmp_la->la_mode;
1061 la->la_mode = (la->la_mode & S_IALLUGO) |
1062 (tmp_la->la_mode & ~S_IALLUGO);
1064 /* Also check the setgid bit! */
1065 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1066 la->la_gid : tmp_la->la_gid) &&
1067 !mdd_capable(uc, CFS_CAP_FSETID))
1068 la->la_mode &= ~S_ISGID;
1070 la->la_mode = tmp_la->la_mode;
1073 /* Make sure a caller can chown. */
1074 if (la->la_valid & LA_UID) {
1075 if (la->la_uid == (uid_t) -1)
1076 la->la_uid = tmp_la->la_uid;
1077 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1078 (la->la_uid != tmp_la->la_uid)) &&
1079 !mdd_capable(uc, CFS_CAP_CHOWN))
1082 /* If the user or group of a non-directory has been
1083 * changed by a non-root user, remove the setuid bit.
1084 * 19981026 David C Niemi <niemi@tux.org>
1086 * Changed this to apply to all users, including root,
1087 * to avoid some races. This is the behavior we had in
1088 * 2.0. The check for non-root was definitely wrong
1089 * for 2.2 anyway, as it should have been using
1090 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1091 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1092 !S_ISDIR(tmp_la->la_mode)) {
1093 la->la_mode &= ~S_ISUID;
1094 la->la_valid |= LA_MODE;
1098 /* Make sure caller can chgrp. */
1099 if (la->la_valid & LA_GID) {
1100 if (la->la_gid == (gid_t) -1)
1101 la->la_gid = tmp_la->la_gid;
1102 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1103 ((la->la_gid != tmp_la->la_gid) &&
1104 !lustre_in_group_p(uc, la->la_gid))) &&
1105 !mdd_capable(uc, CFS_CAP_CHOWN))
1108 /* Likewise, if the user or group of a non-directory
1109 * has been changed by a non-root user, remove the
1110 * setgid bit UNLESS there is no group execute bit
1111 * (this would be a file marked for mandatory
1112 * locking). 19981026 David C Niemi <niemi@tux.org>
1114 * Removed the fsuid check (see the comment above) --
1116 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1117 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1118 la->la_mode &= ~S_ISGID;
1119 la->la_valid |= LA_MODE;
1123 /* For both Size-on-MDS case and truncate case,
1124 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1125 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1126 * For SOM case, it is true, the MAY_WRITE perm has been checked
1127 * when open, no need check again. For truncate case, it is false,
1128 * the MAY_WRITE perm should be checked here. */
1129 if (ma->ma_attr_flags & MDS_SOM) {
1130 /* For the "Size-on-MDS" setattr update, merge coming
1131 * attributes with the set in the inode. BUG 10641 */
1132 if ((la->la_valid & LA_ATIME) &&
1133 (la->la_atime <= tmp_la->la_atime))
1134 la->la_valid &= ~LA_ATIME;
1136 /* OST attributes do not have a priority over MDS attributes,
1137 * so drop times if ctime is equal. */
1138 if ((la->la_valid & LA_CTIME) &&
1139 (la->la_ctime <= tmp_la->la_ctime))
1140 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1142 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1143 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1144 (uc->mu_fsuid == tmp_la->la_uid)) &&
1145 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1146 rc = mdd_permission_internal_locked(env, obj,
1153 if (la->la_valid & LA_CTIME) {
1154 /* The pure setattr, it has the priority over what is
1155 * already set, do not drop it if ctime is equal. */
1156 if (la->la_ctime < tmp_la->la_ctime)
1157 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1165 /** Store a data change changelog record
1166 * If this fails, we must fail the whole transaction; we don't
1167 * want the change to commit without the log entry.
1168 * \param mdd_obj - mdd_object of change
1169 * \param handle - transacion handle
1171 static int mdd_changelog_data_store(const struct lu_env *env,
1172 struct mdd_device *mdd,
1173 enum changelog_rec_type type,
1174 struct mdd_object *mdd_obj,
1175 struct thandle *handle)
1177 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1178 struct llog_changelog_rec *rec;
1183 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1186 LASSERT(handle != NULL);
1187 LASSERT(mdd_obj != NULL);
1189 if ((type == CL_SETATTR) &&
1190 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1191 /* Don't need multiple updates in this log */
1192 /* Don't check under lock - no big deal if we get an extra
1197 reclen = llog_data_len(sizeof(*rec));
1198 buf = mdd_buf_alloc(env, reclen);
1199 if (buf->lb_buf == NULL)
1201 rec = (struct llog_changelog_rec *)buf->lb_buf;
1203 rec->cr.cr_flags = CLF_VERSION;
1204 rec->cr.cr_type = (__u32)type;
1205 rec->cr.cr_tfid = *tfid;
1206 rec->cr.cr_namelen = 0;
1207 mdd_obj->mod_cltime = cfs_time_current_64();
1209 rc = mdd_changelog_llog_write(mdd, rec, handle);
1211 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1212 rc, type, PFID(tfid));
1219 /* set attr and LOV EA at once, return updated attr */
1220 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1221 const struct md_attr *ma)
1223 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1224 struct mdd_device *mdd = mdo2mdd(obj);
1225 struct thandle *handle;
1226 struct lov_mds_md *lmm = NULL;
1227 struct llog_cookie *logcookies = NULL;
1228 int rc, lmm_size = 0, cookie_size = 0;
1229 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1230 #ifdef HAVE_QUOTA_SUPPORT
1231 struct obd_device *obd = mdd->mdd_obd_dev;
1232 struct mds_obd *mds = &obd->u.mds;
1233 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1234 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1235 int quota_opc = 0, block_count = 0;
1236 int inode_pending[MAXQUOTAS] = { 0, 0 };
1237 int block_pending[MAXQUOTAS] = { 0, 0 };
1241 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1242 MDD_TXN_ATTR_SET_OP);
1243 handle = mdd_trans_start(env, mdd);
1245 RETURN(PTR_ERR(handle));
1246 /*TODO: add lock here*/
1247 /* start a log jounal handle if needed */
1248 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1249 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1250 lmm_size = mdd_lov_mdsize(env, mdd);
1251 lmm = mdd_max_lmm_get(env, mdd);
1253 GOTO(cleanup, rc = -ENOMEM);
1255 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1262 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1263 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1264 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1266 *la_copy = ma->ma_attr;
1267 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1271 #ifdef HAVE_QUOTA_SUPPORT
1272 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1273 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1275 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1277 quota_opc = FSFILT_OP_SETATTR;
1278 mdd_quota_wrapper(la_copy, qnids);
1279 mdd_quota_wrapper(la_tmp, qoids);
1280 /* get file quota for new owner */
1281 lquota_chkquota(mds_quota_interface_ref, obd, qnids,
1282 inode_pending, 1, NULL, 0, NULL, 0);
1283 block_count = (la_tmp->la_blocks + 7) >> 3;
1286 mdd_data_get(env, mdd_obj, &data);
1287 /* get block quota for new owner */
1288 lquota_chkquota(mds_quota_interface_ref, obd,
1289 qnids, block_pending,
1291 LQUOTA_FLAGS_BLK, data, 1);
1297 if (la_copy->la_valid & LA_FLAGS) {
1298 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1301 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1302 } else if (la_copy->la_valid) { /* setattr */
1303 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1305 /* journal chown/chgrp in llog, just like unlink */
1306 if (rc == 0 && lmm_size){
1307 cookie_size = mdd_lov_cookiesize(env, mdd);
1308 logcookies = mdd_max_cookie_get(env, mdd);
1309 if (logcookies == NULL)
1310 GOTO(cleanup, rc = -ENOMEM);
1312 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1313 logcookies, cookie_size) <= 0)
1318 if (rc == 0 && ma->ma_valid & MA_LOV) {
1321 mode = mdd_object_type(mdd_obj);
1322 if (S_ISREG(mode) || S_ISDIR(mode)) {
1323 rc = mdd_lsm_sanity_check(env, mdd_obj);
1327 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1328 ma->ma_lmm_size, handle, 1);
1333 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1334 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1336 mdd_trans_stop(env, mdd, rc, handle);
1337 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1338 /*set obd attr, if needed*/
1339 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1342 #ifdef HAVE_QUOTA_SUPPORT
1344 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1346 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1348 /* Trigger dqrel/dqacq for original owner and new owner.
1349 * If failed, the next call for lquota_chkquota will
1351 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1358 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1359 const struct lu_buf *buf, const char *name, int fl,
1360 struct thandle *handle)
1365 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1366 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1367 mdd_write_unlock(env, obj);
1372 static int mdd_xattr_sanity_check(const struct lu_env *env,
1373 struct mdd_object *obj)
1375 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1376 struct md_ucred *uc = md_ucred(env);
1380 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1383 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1387 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1388 !mdd_capable(uc, CFS_CAP_FOWNER))
1395 * The caller should guarantee to update the object ctime
1396 * after xattr_set if needed.
1398 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1399 const struct lu_buf *buf, const char *name,
1402 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1403 struct mdd_device *mdd = mdo2mdd(obj);
1404 struct thandle *handle;
1408 rc = mdd_xattr_sanity_check(env, mdd_obj);
1412 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1413 /* security-replated changes may require sync */
1414 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1415 mdd->mdd_sync_permission == 1)
1416 txn_param_sync(&mdd_env_info(env)->mti_param);
1418 handle = mdd_trans_start(env, mdd);
1420 RETURN(PTR_ERR(handle));
1422 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1424 /* Only record user xattr changes */
1425 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1426 (strncmp("user.", name, 5) == 0))
1427 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1429 mdd_trans_stop(env, mdd, rc, handle);
1435 * The caller should guarantee to update the object ctime
1436 * after xattr_set if needed.
1438 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1441 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1442 struct mdd_device *mdd = mdo2mdd(obj);
1443 struct thandle *handle;
1447 rc = mdd_xattr_sanity_check(env, mdd_obj);
1451 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1452 handle = mdd_trans_start(env, mdd);
1454 RETURN(PTR_ERR(handle));
1456 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1457 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1458 mdd_object_capa(env, mdd_obj));
1459 mdd_write_unlock(env, mdd_obj);
1461 /* Only record user xattr changes */
1462 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1463 (strncmp("user.", name, 5) != 0))
1464 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1467 mdd_trans_stop(env, mdd, rc, handle);
1472 /* partial unlink */
1473 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1476 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1477 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1478 struct mdd_device *mdd = mdo2mdd(obj);
1479 struct thandle *handle;
1480 #ifdef HAVE_QUOTA_SUPPORT
1481 struct obd_device *obd = mdd->mdd_obd_dev;
1482 struct mds_obd *mds = &obd->u.mds;
1483 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1490 * Check -ENOENT early here because we need to get object type
1491 * to calculate credits before transaction start
1493 if (!mdd_object_exists(mdd_obj))
1496 LASSERT(mdd_object_exists(mdd_obj) > 0);
1498 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1502 handle = mdd_trans_start(env, mdd);
1506 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1508 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1512 __mdd_ref_del(env, mdd_obj, handle, 0);
1514 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1516 __mdd_ref_del(env, mdd_obj, handle, 1);
1519 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1520 la_copy->la_ctime = ma->ma_attr.la_ctime;
1522 la_copy->la_valid = LA_CTIME;
1523 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1527 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1528 #ifdef HAVE_QUOTA_SUPPORT
1529 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1530 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1531 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1532 mdd_quota_wrapper(&ma->ma_attr, qids);
1539 mdd_write_unlock(env, mdd_obj);
1540 mdd_trans_stop(env, mdd, rc, handle);
1541 #ifdef HAVE_QUOTA_SUPPORT
1543 /* Trigger dqrel on the owner of child. If failed,
1544 * the next call for lquota_chkquota will process it */
1545 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1551 /* partial operation */
1552 static int mdd_oc_sanity_check(const struct lu_env *env,
1553 struct mdd_object *obj,
1559 switch (ma->ma_attr.la_mode & S_IFMT) {
1576 static int mdd_object_create(const struct lu_env *env,
1577 struct md_object *obj,
1578 const struct md_op_spec *spec,
1582 struct mdd_device *mdd = mdo2mdd(obj);
1583 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1584 const struct lu_fid *pfid = spec->u.sp_pfid;
1585 struct thandle *handle;
1586 #ifdef HAVE_QUOTA_SUPPORT
1587 struct obd_device *obd = mdd->mdd_obd_dev;
1588 struct mds_obd *mds = &obd->u.mds;
1589 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1590 int quota_opc = 0, block_count = 0;
1591 int inode_pending[MAXQUOTAS] = { 0, 0 };
1592 int block_pending[MAXQUOTAS] = { 0, 0 };
1597 #ifdef HAVE_QUOTA_SUPPORT
1598 if (mds->mds_quota) {
1599 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1600 mdd_quota_wrapper(&ma->ma_attr, qids);
1601 /* get file quota for child */
1602 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1603 inode_pending, 1, NULL, 0, NULL, 0);
1604 switch (ma->ma_attr.la_mode & S_IFMT) {
1613 /* get block quota for child */
1615 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1616 block_pending, block_count, NULL,
1617 LQUOTA_FLAGS_BLK, NULL, 0);
1621 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1622 handle = mdd_trans_start(env, mdd);
1624 GOTO(out_pending, rc = PTR_ERR(handle));
1626 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1627 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1631 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1635 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1636 /* If creating the slave object, set slave EA here. */
1637 int lmv_size = spec->u.sp_ea.eadatalen;
1638 struct lmv_stripe_md *lmv;
1640 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1641 LASSERT(lmv != NULL && lmv_size > 0);
1643 rc = __mdd_xattr_set(env, mdd_obj,
1644 mdd_buf_get_const(env, lmv, lmv_size),
1645 XATTR_NAME_LMV, 0, handle);
1649 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1652 #ifdef CONFIG_FS_POSIX_ACL
1653 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1654 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1656 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1657 buf->lb_len = spec->u.sp_ea.eadatalen;
1658 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1659 rc = __mdd_acl_init(env, mdd_obj, buf,
1660 &ma->ma_attr.la_mode,
1665 ma->ma_attr.la_valid |= LA_MODE;
1668 pfid = spec->u.sp_ea.fid;
1671 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1677 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1678 mdd_write_unlock(env, mdd_obj);
1680 mdd_trans_stop(env, mdd, rc, handle);
1682 #ifdef HAVE_QUOTA_SUPPORT
1684 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1686 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1688 /* Trigger dqacq on the owner of child. If failed,
1689 * the next call for lquota_chkquota will process it. */
1690 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1698 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1699 const struct md_attr *ma)
1701 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1702 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1703 struct mdd_device *mdd = mdo2mdd(obj);
1704 struct thandle *handle;
1708 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1709 handle = mdd_trans_start(env, mdd);
1713 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1714 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1716 __mdd_ref_add(env, mdd_obj, handle);
1717 mdd_write_unlock(env, mdd_obj);
1719 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1720 la_copy->la_ctime = ma->ma_attr.la_ctime;
1722 la_copy->la_valid = LA_CTIME;
1723 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1726 mdd_trans_stop(env, mdd, 0, handle);
1732 * do NOT or the MAY_*'s, you'll get the weakest
1734 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1738 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1739 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1740 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1741 * owner can write to a file even if it is marked readonly to hide
1742 * its brokenness. (bug 5781) */
1743 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1744 struct md_ucred *uc = md_ucred(env);
1746 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1747 (la->la_uid == uc->mu_fsuid))
1751 if (flags & FMODE_READ)
1753 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1755 if (flags & MDS_FMODE_EXEC)
1760 static int mdd_open_sanity_check(const struct lu_env *env,
1761 struct mdd_object *obj, int flag)
1763 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1768 if (mdd_is_dead_obj(obj))
1771 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1775 if (S_ISLNK(tmp_la->la_mode))
1778 mode = accmode(env, tmp_la, flag);
1780 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1783 if (!(flag & MDS_OPEN_CREATED)) {
1784 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1789 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1790 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1791 flag &= ~MDS_OPEN_TRUNC;
1793 /* For writing append-only file must open it with append mode. */
1794 if (mdd_is_append(obj)) {
1795 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1797 if (flag & MDS_OPEN_TRUNC)
1803 * Now, flag -- O_NOATIME does not be packed by client.
1805 if (flag & O_NOATIME) {
1806 struct md_ucred *uc = md_ucred(env);
1808 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1809 (uc->mu_valid == UCRED_NEW)) &&
1810 (uc->mu_fsuid != tmp_la->la_uid) &&
1811 !mdd_capable(uc, CFS_CAP_FOWNER))
1819 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1822 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1825 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1827 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1829 mdd_obj->mod_count++;
1831 mdd_write_unlock(env, mdd_obj);
1835 /* return md_attr back,
1836 * if it is last unlink then return lov ea + llog cookie*/
1837 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1843 if (S_ISREG(mdd_object_type(obj))) {
1844 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1845 * Caller must be ready for that. */
1847 rc = __mdd_lmm_get(env, obj, ma);
1848 if ((ma->ma_valid & MA_LOV))
1849 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1856 * No permission check is needed.
1858 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1861 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1862 struct mdd_device *mdd = mdo2mdd(obj);
1863 struct thandle *handle;
1867 #ifdef HAVE_QUOTA_SUPPORT
1868 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1869 struct mds_obd *mds = &obd->u.mds;
1870 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1875 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1878 handle = mdd_trans_start(env, mdo2mdd(obj));
1880 RETURN(PTR_ERR(handle));
1882 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1883 /* release open count */
1884 mdd_obj->mod_count --;
1886 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1887 /* remove link to object from orphan index */
1888 rc = __mdd_orphan_del(env, mdd_obj, handle);
1890 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1891 "list, OSS objects to be destroyed.\n",
1892 PFID(mdd_object_fid(mdd_obj)));
1894 CERROR("Object "DFID" can not be deleted from orphan "
1895 "list, maybe cause OST objects can not be "
1896 "destroyed (err: %d).\n",
1897 PFID(mdd_object_fid(mdd_obj)), rc);
1898 /* If object was not deleted from orphan list, do not
1899 * destroy OSS objects, which will be done when next
1905 rc = mdd_iattr_get(env, mdd_obj, ma);
1906 /* Object maybe not in orphan list originally, it is rare case for
1907 * mdd_finish_unlink() failure. */
1908 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1909 #ifdef HAVE_QUOTA_SUPPORT
1910 if (mds->mds_quota) {
1911 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1912 mdd_quota_wrapper(&ma->ma_attr, qids);
1915 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
1916 if (ma->ma_valid & MA_FLAGS &&
1917 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
1918 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
1920 rc = mdd_object_kill(env, mdd_obj, ma);
1926 CERROR("Error when prepare to delete Object "DFID" , "
1927 "which will cause OST objects can not be "
1928 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
1934 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1936 mdd_write_unlock(env, mdd_obj);
1937 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1938 #ifdef HAVE_QUOTA_SUPPORT
1940 /* Trigger dqrel on the owner of child. If failed,
1941 * the next call for lquota_chkquota will process it */
1942 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1949 * Permission check is done when open,
1950 * no need check again.
1952 static int mdd_readpage_sanity_check(const struct lu_env *env,
1953 struct mdd_object *obj)
1955 struct dt_object *next = mdd_object_child(obj);
1959 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1967 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
1968 int first, void *area, int nob,
1969 const struct dt_it_ops *iops, struct dt_it *it,
1970 __u64 *start, __u64 *end,
1971 struct lu_dirent **last, __u32 attr)
1975 struct lu_dirent *ent;
1978 memset(area, 0, sizeof (struct lu_dirpage));
1979 area += sizeof (struct lu_dirpage);
1980 nob -= sizeof (struct lu_dirpage);
1988 len = iops->key_size(env, it);
1990 /* IAM iterator can return record with zero len. */
1994 hash = iops->store(env, it);
1995 if (unlikely(first)) {
2000 /* calculate max space required for lu_dirent */
2001 recsize = lu_dirent_calc_size(len, attr);
2003 if (nob >= recsize) {
2004 result = iops->rec(env, it, ent, attr);
2005 if (result == -ESTALE)
2010 /* osd might not able to pack all attributes,
2011 * so recheck rec length */
2012 recsize = le16_to_cpu(ent->lde_reclen);
2015 * record doesn't fit into page, enlarge previous one.
2018 (*last)->lde_reclen =
2019 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2028 ent = (void *)ent + recsize;
2032 result = iops->next(env, it);
2033 if (result == -ESTALE)
2035 } while (result == 0);
2042 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2043 const struct lu_rdpg *rdpg)
2046 struct dt_object *next = mdd_object_child(obj);
2047 const struct dt_it_ops *iops;
2049 struct lu_dirent *last = NULL;
2050 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2057 LASSERT(rdpg->rp_pages != NULL);
2058 LASSERT(next->do_index_ops != NULL);
2060 if (rdpg->rp_count <= 0)
2064 * iterate through directory and fill pages from @rdpg
2066 iops = &next->do_index_ops->dio_it;
2067 it = iops->init(env, next, mdd_object_capa(env, obj));
2071 rc = iops->load(env, it, rdpg->rp_hash);
2075 * Iterator didn't find record with exactly the key requested.
2077 * It is currently either
2079 * - positioned above record with key less than
2080 * requested---skip it.
2082 * - or not positioned at all (is in IAM_IT_SKEWED
2083 * state)---position it on the next item.
2085 rc = iops->next(env, it);
2090 * At this point and across for-loop:
2092 * rc == 0 -> ok, proceed.
2093 * rc > 0 -> end of directory.
2096 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2097 i++, nob -= CFS_PAGE_SIZE) {
2098 LASSERT(i < rdpg->rp_npages);
2099 pg = rdpg->rp_pages[i];
2100 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2101 min_t(int, nob, CFS_PAGE_SIZE), iops,
2102 it, &hash_start, &hash_end, &last,
2104 if (rc != 0 || i == rdpg->rp_npages - 1) {
2106 last->lde_reclen = 0;
2114 hash_end = DIR_END_OFF;
2118 struct lu_dirpage *dp;
2120 dp = cfs_kmap(rdpg->rp_pages[0]);
2121 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2122 dp->ldp_hash_end = cpu_to_le64(hash_end);
2125 * No pages were processed, mark this.
2127 dp->ldp_flags |= LDF_EMPTY;
2129 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2130 cfs_kunmap(rdpg->rp_pages[0]);
2133 iops->fini(env, it);
2138 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2139 const struct lu_rdpg *rdpg)
2141 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2145 LASSERT(mdd_object_exists(mdd_obj));
2147 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2148 rc = mdd_readpage_sanity_check(env, mdd_obj);
2150 GOTO(out_unlock, rc);
2152 if (mdd_is_dead_obj(mdd_obj)) {
2154 struct lu_dirpage *dp;
2157 * According to POSIX, please do not return any entry to client:
2158 * even dot and dotdot should not be returned.
2160 CWARN("readdir from dead object: "DFID"\n",
2161 PFID(mdd_object_fid(mdd_obj)));
2163 if (rdpg->rp_count <= 0)
2164 GOTO(out_unlock, rc = -EFAULT);
2165 LASSERT(rdpg->rp_pages != NULL);
2167 pg = rdpg->rp_pages[0];
2168 dp = (struct lu_dirpage*)cfs_kmap(pg);
2169 memset(dp, 0 , sizeof(struct lu_dirpage));
2170 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2171 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2172 dp->ldp_flags |= LDF_EMPTY;
2173 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2175 GOTO(out_unlock, rc = 0);
2178 rc = __mdd_readpage(env, mdd_obj, rdpg);
2182 mdd_read_unlock(env, mdd_obj);
2186 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2188 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2189 struct dt_object *next;
2191 LASSERT(mdd_object_exists(mdd_obj));
2192 next = mdd_object_child(mdd_obj);
2193 return next->do_ops->do_object_sync(env, next);
2196 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2197 struct md_object *obj)
2199 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2201 LASSERT(mdd_object_exists(mdd_obj));
2202 return do_version_get(env, mdd_object_child(mdd_obj));
2205 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2206 dt_obj_version_t version)
2208 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2210 LASSERT(mdd_object_exists(mdd_obj));
2211 return do_version_set(env, mdd_object_child(mdd_obj), version);
2214 const struct md_object_operations mdd_obj_ops = {
2215 .moo_permission = mdd_permission,
2216 .moo_attr_get = mdd_attr_get,
2217 .moo_attr_set = mdd_attr_set,
2218 .moo_xattr_get = mdd_xattr_get,
2219 .moo_xattr_set = mdd_xattr_set,
2220 .moo_xattr_list = mdd_xattr_list,
2221 .moo_xattr_del = mdd_xattr_del,
2222 .moo_object_create = mdd_object_create,
2223 .moo_ref_add = mdd_ref_add,
2224 .moo_ref_del = mdd_ref_del,
2225 .moo_open = mdd_open,
2226 .moo_close = mdd_close,
2227 .moo_readpage = mdd_readpage,
2228 .moo_readlink = mdd_readlink,
2229 .moo_capa_get = mdd_capa_get,
2230 .moo_object_sync = mdd_object_sync,
2231 .moo_version_get = mdd_version_get,
2232 .moo_version_set = mdd_version_set,
2233 .moo_path = mdd_path,