1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
64 #include "mdd_internal.h"
66 static const struct lu_object_operations mdd_lu_obj_ops;
68 static int mdd_xattr_get(const struct lu_env *env,
69 struct md_object *obj, struct lu_buf *buf,
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
75 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
76 PFID(mdd_object_fid(obj)));
77 mdo_data_get(env, obj, data);
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82 struct lu_attr *la, struct lustre_capa *capa)
84 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85 PFID(mdd_object_fid(obj)));
86 return mdo_attr_get(env, obj, la, capa);
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93 if (flags & LUSTRE_APPEND_FL)
94 obj->mod_flags |= APPEND_OBJ;
96 if (flags & LUSTRE_IMMUTABLE_FL)
97 obj->mod_flags |= IMMUTE_OBJ;
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 struct mdd_thread_info *info;
104 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105 LASSERT(info != NULL);
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
113 buf = &mdd_env_info(env)->mti_buf;
119 void mdd_buf_put(struct lu_buf *buf)
121 if (buf == NULL || buf->lb_buf == NULL)
123 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129 const void *area, ssize_t len)
133 buf = &mdd_env_info(env)->mti_buf;
134 buf->lb_buf = (void *)area;
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
147 if (buf->lb_buf == NULL) {
149 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150 if (buf->lb_buf == NULL)
156 /** Increase the size of the \a mti_big_buf.
157 * preserves old data in buffer
158 * old buffer remains unchanged on error
159 * \retval 0 or -ENOMEM
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
166 LASSERT(len >= oldbuf->lb_len);
167 OBD_ALLOC_LARGE(buf.lb_buf, len);
169 if (buf.lb_buf == NULL)
173 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177 memcpy(oldbuf, &buf, sizeof(buf));
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183 struct mdd_device *mdd)
185 struct mdd_thread_info *mti = mdd_env_info(env);
188 max_cookie_size = mdd_lov_cookiesize(env, mdd);
189 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190 if (mti->mti_max_cookie)
191 OBD_FREE_LARGE(mti->mti_max_cookie,
192 mti->mti_max_cookie_size);
193 mti->mti_max_cookie = NULL;
194 mti->mti_max_cookie_size = 0;
196 if (unlikely(mti->mti_max_cookie == NULL)) {
197 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198 if (likely(mti->mti_max_cookie != NULL))
199 mti->mti_max_cookie_size = max_cookie_size;
201 if (likely(mti->mti_max_cookie != NULL))
202 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203 return mti->mti_max_cookie;
206 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
207 struct mdd_device *mdd)
209 struct mdd_thread_info *mti = mdd_env_info(env);
212 max_lmm_size = mdd_lov_mdsize(env, mdd);
213 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
214 if (mti->mti_max_lmm)
215 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
216 mti->mti_max_lmm = NULL;
217 mti->mti_max_lmm_size = 0;
219 if (unlikely(mti->mti_max_lmm == NULL)) {
220 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
221 if (likely(mti->mti_max_lmm != NULL))
222 mti->mti_max_lmm_size = max_lmm_size;
224 return mti->mti_max_lmm;
227 struct lu_object *mdd_object_alloc(const struct lu_env *env,
228 const struct lu_object_header *hdr,
231 struct mdd_object *mdd_obj;
233 OBD_ALLOC_PTR(mdd_obj);
234 if (mdd_obj != NULL) {
237 o = mdd2lu_obj(mdd_obj);
238 lu_object_init(o, NULL, d);
239 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
240 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
241 mdd_obj->mod_count = 0;
242 o->lo_ops = &mdd_lu_obj_ops;
249 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
250 const struct lu_object_conf *unused)
252 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
253 struct mdd_object *mdd_obj = lu2mdd_obj(o);
254 struct lu_object *below;
255 struct lu_device *under;
258 mdd_obj->mod_cltime = 0;
259 under = &d->mdd_child->dd_lu_dev;
260 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
261 mdd_pdlock_init(mdd_obj);
265 lu_object_add(o, below);
270 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
272 if (lu_object_exists(o))
273 return mdd_get_flags(env, lu2mdd_obj(o));
278 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
280 struct mdd_object *mdd = lu2mdd_obj(o);
286 static int mdd_object_print(const struct lu_env *env, void *cookie,
287 lu_printer_t p, const struct lu_object *o)
289 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
290 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
291 "valid=%x, cltime="LPU64", flags=%lx)",
292 mdd, mdd->mod_count, mdd->mod_valid,
293 mdd->mod_cltime, mdd->mod_flags);
296 static const struct lu_object_operations mdd_lu_obj_ops = {
297 .loo_object_init = mdd_object_init,
298 .loo_object_start = mdd_object_start,
299 .loo_object_free = mdd_object_free,
300 .loo_object_print = mdd_object_print,
303 struct mdd_object *mdd_object_find(const struct lu_env *env,
304 struct mdd_device *d,
305 const struct lu_fid *f)
307 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
310 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
311 const char *path, struct lu_fid *fid)
314 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
315 struct mdd_object *obj;
316 struct lu_name *lname = &mdd_env_info(env)->mti_name;
321 /* temp buffer for path element */
322 buf = mdd_buf_alloc(env, PATH_MAX);
323 if (buf->lb_buf == NULL)
326 lname->ln_name = name = buf->lb_buf;
327 lname->ln_namelen = 0;
328 *f = mdd->mdd_root_fid;
335 while (*path != '/' && *path != '\0') {
343 /* find obj corresponding to fid */
344 obj = mdd_object_find(env, mdd, f);
346 GOTO(out, rc = -EREMOTE);
348 GOTO(out, rc = PTR_ERR(obj));
349 /* get child fid from parent and name */
350 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
351 mdd_object_put(env, obj);
356 lname->ln_namelen = 0;
365 /** The maximum depth that fid2path() will search.
366 * This is limited only because we want to store the fids for
367 * historical path lookup purposes.
369 #define MAX_PATH_DEPTH 100
371 /** mdd_path() lookup structure. */
372 struct path_lookup_info {
373 __u64 pli_recno; /**< history point */
374 __u64 pli_currec; /**< current record */
375 struct lu_fid pli_fid;
376 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377 struct mdd_object *pli_mdd_obj;
378 char *pli_path; /**< full path */
380 int pli_linkno; /**< which hardlink to follow */
381 int pli_fidcount; /**< number of \a pli_fids */
384 static int mdd_path_current(const struct lu_env *env,
385 struct path_lookup_info *pli)
387 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388 struct mdd_object *mdd_obj;
389 struct lu_buf *buf = NULL;
390 struct link_ea_header *leh;
391 struct link_ea_entry *lee;
392 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
399 ptr = pli->pli_path + pli->pli_pathlen - 1;
402 pli->pli_fidcount = 0;
403 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
405 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406 mdd_obj = mdd_object_find(env, mdd,
407 &pli->pli_fids[pli->pli_fidcount]);
409 GOTO(out, rc = -EREMOTE);
411 GOTO(out, rc = PTR_ERR(mdd_obj));
412 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
414 mdd_object_put(env, mdd_obj);
418 /* Do I need to error out here? */
423 /* Get parent fid and object name */
424 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425 buf = mdd_links_get(env, mdd_obj);
426 mdd_read_unlock(env, mdd_obj);
427 mdd_object_put(env, mdd_obj);
429 GOTO(out, rc = PTR_ERR(buf));
432 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
433 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
435 /* If set, use link #linkno for path lookup, otherwise use
436 link #0. Only do this for the final path element. */
437 if ((pli->pli_fidcount == 0) &&
438 (pli->pli_linkno < leh->leh_reccount)) {
440 for (count = 0; count < pli->pli_linkno; count++) {
441 lee = (struct link_ea_entry *)
442 ((char *)lee + reclen);
443 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445 if (pli->pli_linkno < leh->leh_reccount - 1)
446 /* indicate to user there are more links */
450 /* Pack the name in the end of the buffer */
451 ptr -= tmpname->ln_namelen;
452 if (ptr - 1 <= pli->pli_path)
453 GOTO(out, rc = -EOVERFLOW);
454 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
457 /* Store the parent fid for historic lookup */
458 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
459 GOTO(out, rc = -EOVERFLOW);
460 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
463 /* Verify that our path hasn't changed since we started the lookup.
464 Record the current index, and verify the path resolves to the
465 same fid. If it does, then the path is correct as of this index. */
466 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
467 pli->pli_currec = mdd->mdd_cl.mc_index;
468 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
469 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
471 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
472 GOTO (out, rc = -EAGAIN);
474 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
475 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
476 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
477 PFID(&pli->pli_fid));
478 GOTO(out, rc = -EAGAIN);
480 ptr++; /* skip leading / */
481 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
485 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
486 /* if we vmalloced a large buffer drop it */
492 static int mdd_path_historic(const struct lu_env *env,
493 struct path_lookup_info *pli)
498 /* Returns the full path to this fid, as of changelog record recno. */
499 static int mdd_path(const struct lu_env *env, struct md_object *obj,
500 char *path, int pathlen, __u64 *recno, int *linkno)
502 struct path_lookup_info *pli;
510 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
519 pli->pli_mdd_obj = md2mdd_obj(obj);
520 pli->pli_recno = *recno;
521 pli->pli_path = path;
522 pli->pli_pathlen = pathlen;
523 pli->pli_linkno = *linkno;
525 /* Retry multiple times in case file is being moved */
526 while (tries-- && rc == -EAGAIN)
527 rc = mdd_path_current(env, pli);
529 /* For historical path lookup, the current links may not have existed
530 * at "recno" time. We must switch over to earlier links/parents
531 * by using the changelog records. If the earlier parent doesn't
532 * exist, we must search back through the changelog to reconstruct
533 * its parents, then check if it exists, etc.
534 * We may ignore this problem for the initial implementation and
535 * state that an "original" hardlink must still exist for us to find
536 * historic path name. */
537 if (pli->pli_recno != -1) {
538 rc = mdd_path_historic(env, pli);
540 *recno = pli->pli_currec;
541 /* Return next link index to caller */
542 *linkno = pli->pli_linkno;
550 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
552 struct lu_attr *la = &mdd_env_info(env)->mti_la;
556 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
558 mdd_flags_xlate(obj, la->la_flags);
559 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
560 obj->mod_flags |= MNLINK_OBJ;
565 /* get only inode attributes */
566 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
572 if (ma->ma_valid & MA_INODE)
575 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
576 mdd_object_capa(env, mdd_obj));
578 ma->ma_valid |= MA_INODE;
582 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
584 struct lov_desc *ldesc;
585 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
586 struct lov_user_md *lum = (struct lov_user_md*)lmm;
592 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
593 LASSERT(ldesc != NULL);
595 lum->lmm_magic = LOV_MAGIC_V1;
596 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
597 lum->lmm_pattern = ldesc->ld_pattern;
598 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
599 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
600 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
602 RETURN(sizeof(*lum));
605 static int is_rootdir(struct mdd_object *mdd_obj)
607 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
608 const struct lu_fid *fid = mdo2fid(mdd_obj);
610 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
613 /* get lov EA only */
614 static int __mdd_lmm_get(const struct lu_env *env,
615 struct mdd_object *mdd_obj, struct md_attr *ma)
620 if (ma->ma_valid & MA_LOV)
623 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
625 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
626 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
628 ma->ma_lmm_size = rc;
629 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
630 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
636 /* get the first parent fid from link EA */
637 static int mdd_pfid_get(const struct lu_env *env,
638 struct mdd_object *mdd_obj, struct md_attr *ma)
641 struct link_ea_header *leh;
642 struct link_ea_entry *lee;
643 struct lu_fid *pfid = &ma->ma_pfid;
646 if (ma->ma_valid & MA_PFID)
649 buf = mdd_links_get(env, mdd_obj);
651 RETURN(PTR_ERR(buf));
654 lee = (struct link_ea_entry *)(leh + 1);
655 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
656 fid_be_to_cpu(pfid, pfid);
657 ma->ma_valid |= MA_PFID;
658 if (buf->lb_len > OBD_ALLOC_BIG)
659 /* if we vmalloced a large buffer drop it */
664 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
670 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
671 rc = __mdd_lmm_get(env, mdd_obj, ma);
672 mdd_read_unlock(env, mdd_obj);
677 static int __mdd_lmv_get(const struct lu_env *env,
678 struct mdd_object *mdd_obj, struct md_attr *ma)
683 if (ma->ma_valid & MA_LMV)
686 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
689 ma->ma_valid |= MA_LMV;
695 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
698 struct mdd_thread_info *info = mdd_env_info(env);
699 struct lustre_mdt_attrs *lma =
700 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
705 /* If all needed data are already valid, nothing to do */
706 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
707 (ma->ma_need & (MA_HSM | MA_SOM)))
710 /* Read LMA from disk EA */
711 lma_size = sizeof(info->mti_xattr_buf);
712 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
716 /* Useless to check LMA incompatibility because this is already done in
717 * osd_ea_fid_get(), and this will fail long before this code is
719 * So, if we are here, LMA is compatible.
722 lustre_lma_swab(lma);
724 /* Swab and copy LMA */
725 if (ma->ma_need & MA_HSM) {
726 if (lma->lma_compat & LMAC_HSM)
727 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
729 ma->ma_hsm.mh_flags = 0;
730 ma->ma_valid |= MA_HSM;
734 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
735 LASSERT(ma->ma_som != NULL);
736 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
737 ma->ma_som->msd_size = lma->lma_som_size;
738 ma->ma_som->msd_blocks = lma->lma_som_blocks;
739 ma->ma_som->msd_mountid = lma->lma_som_mountid;
740 ma->ma_valid |= MA_SOM;
746 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
752 if (ma->ma_need & MA_INODE)
753 rc = mdd_iattr_get(env, mdd_obj, ma);
755 if (rc == 0 && ma->ma_need & MA_LOV) {
756 if (S_ISREG(mdd_object_type(mdd_obj)) ||
757 S_ISDIR(mdd_object_type(mdd_obj)))
758 rc = __mdd_lmm_get(env, mdd_obj, ma);
760 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
761 if (S_ISREG(mdd_object_type(mdd_obj)))
762 rc = mdd_pfid_get(env, mdd_obj, ma);
764 if (rc == 0 && ma->ma_need & MA_LMV) {
765 if (S_ISDIR(mdd_object_type(mdd_obj)))
766 rc = __mdd_lmv_get(env, mdd_obj, ma);
768 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
769 if (S_ISREG(mdd_object_type(mdd_obj)))
770 rc = __mdd_lma_get(env, mdd_obj, ma);
772 #ifdef CONFIG_FS_POSIX_ACL
773 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
774 if (S_ISDIR(mdd_object_type(mdd_obj)))
775 rc = mdd_def_acl_get(env, mdd_obj, ma);
778 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
779 rc, ma->ma_valid, ma->ma_lmm);
783 int mdd_attr_get_internal_locked(const struct lu_env *env,
784 struct mdd_object *mdd_obj, struct md_attr *ma)
787 int needlock = ma->ma_need &
788 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
791 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
792 rc = mdd_attr_get_internal(env, mdd_obj, ma);
794 mdd_read_unlock(env, mdd_obj);
799 * No permission check is needed.
801 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
804 struct mdd_object *mdd_obj = md2mdd_obj(obj);
808 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
813 * No permission check is needed.
815 static int mdd_xattr_get(const struct lu_env *env,
816 struct md_object *obj, struct lu_buf *buf,
819 struct mdd_object *mdd_obj = md2mdd_obj(obj);
824 LASSERT(mdd_object_exists(mdd_obj));
826 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
827 rc = mdo_xattr_get(env, mdd_obj, buf, name,
828 mdd_object_capa(env, mdd_obj));
829 mdd_read_unlock(env, mdd_obj);
835 * Permission check is done when open,
836 * no need check again.
838 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
841 struct mdd_object *mdd_obj = md2mdd_obj(obj);
842 struct dt_object *next;
847 LASSERT(mdd_object_exists(mdd_obj));
849 next = mdd_object_child(mdd_obj);
850 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
851 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
852 mdd_object_capa(env, mdd_obj));
853 mdd_read_unlock(env, mdd_obj);
858 * No permission check is needed.
860 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
863 struct mdd_object *mdd_obj = md2mdd_obj(obj);
868 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
869 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
870 mdd_read_unlock(env, mdd_obj);
875 int mdd_declare_object_create_internal(const struct lu_env *env,
876 struct mdd_object *p,
877 struct mdd_object *c,
879 struct thandle *handle,
880 const struct md_op_spec *spec)
882 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
883 const struct dt_index_features *feat = spec->sp_feat;
887 if (feat != &dt_directory_features && feat != NULL)
888 dof->dof_type = DFT_INDEX;
890 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
892 dof->u.dof_idx.di_feat = feat;
894 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
899 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
900 struct mdd_object *c, struct md_attr *ma,
901 struct thandle *handle,
902 const struct md_op_spec *spec)
904 struct lu_attr *attr = &ma->ma_attr;
905 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
906 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
907 const struct dt_index_features *feat = spec->sp_feat;
911 if (!mdd_object_exists(c)) {
912 struct dt_object *next = mdd_object_child(c);
915 if (feat != &dt_directory_features && feat != NULL)
916 dof->dof_type = DFT_INDEX;
918 dof->dof_type = dt_mode_to_dft(attr->la_mode);
920 dof->u.dof_idx.di_feat = feat;
922 /* @hint will be initialized by underlying device. */
923 next->do_ops->do_ah_init(env, hint,
924 p ? mdd_object_child(p) : NULL,
925 attr->la_mode & S_IFMT);
927 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
928 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
936 * Make sure the ctime is increased only.
938 static inline int mdd_attr_check(const struct lu_env *env,
939 struct mdd_object *obj,
940 struct lu_attr *attr)
942 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
946 if (attr->la_valid & LA_CTIME) {
947 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
951 if (attr->la_ctime < tmp_la->la_ctime)
952 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
953 else if (attr->la_valid == LA_CTIME &&
954 attr->la_ctime == tmp_la->la_ctime)
955 attr->la_valid &= ~LA_CTIME;
960 int mdd_attr_set_internal(const struct lu_env *env,
961 struct mdd_object *obj,
962 struct lu_attr *attr,
963 struct thandle *handle,
969 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
970 #ifdef CONFIG_FS_POSIX_ACL
971 if (!rc && (attr->la_valid & LA_MODE) && needacl)
972 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
977 int mdd_attr_check_set_internal(const struct lu_env *env,
978 struct mdd_object *obj,
979 struct lu_attr *attr,
980 struct thandle *handle,
986 rc = mdd_attr_check(env, obj, attr);
991 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
995 static int mdd_attr_set_internal_locked(const struct lu_env *env,
996 struct mdd_object *obj,
997 struct lu_attr *attr,
998 struct thandle *handle,
1004 needacl = needacl && (attr->la_valid & LA_MODE);
1006 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1007 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1009 mdd_write_unlock(env, obj);
1013 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1014 struct mdd_object *obj,
1015 struct lu_attr *attr,
1016 struct thandle *handle,
1022 needacl = needacl && (attr->la_valid & LA_MODE);
1024 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1025 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1027 mdd_write_unlock(env, obj);
1031 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1032 const struct lu_buf *buf, const char *name,
1033 int fl, struct thandle *handle)
1035 struct lustre_capa *capa = mdd_object_capa(env, obj);
1039 if (buf->lb_buf && buf->lb_len > 0)
1040 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1041 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1042 rc = mdo_xattr_del(env, obj, name, handle, capa);
1048 * This gives the same functionality as the code between
1049 * sys_chmod and inode_setattr
1050 * chown_common and inode_setattr
1051 * utimes and inode_setattr
1052 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1054 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1055 struct lu_attr *la, const struct md_attr *ma)
1057 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1058 struct md_ucred *uc;
1065 /* Do not permit change file type */
1066 if (la->la_valid & LA_TYPE)
1069 /* They should not be processed by setattr */
1070 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1073 /* export destroy does not have ->le_ses, but we may want
1074 * to drop LUSTRE_SOM_FL. */
1080 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1084 if (la->la_valid == LA_CTIME) {
1085 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1086 /* This is only for set ctime when rename's source is
1088 rc = mdd_may_delete(env, NULL, obj,
1089 (struct md_attr *)ma, 1, 0);
1090 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1091 la->la_valid &= ~LA_CTIME;
1095 if (la->la_valid == LA_ATIME) {
1096 /* This is atime only set for read atime update on close. */
1097 if (la->la_atime >= tmp_la->la_atime &&
1098 la->la_atime < (tmp_la->la_atime +
1099 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1100 la->la_valid &= ~LA_ATIME;
1104 /* Check if flags change. */
1105 if (la->la_valid & LA_FLAGS) {
1106 unsigned int oldflags = 0;
1107 unsigned int newflags = la->la_flags &
1108 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1110 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1111 !mdd_capable(uc, CFS_CAP_FOWNER))
1114 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1115 * only be changed by the relevant capability. */
1116 if (mdd_is_immutable(obj))
1117 oldflags |= LUSTRE_IMMUTABLE_FL;
1118 if (mdd_is_append(obj))
1119 oldflags |= LUSTRE_APPEND_FL;
1120 if ((oldflags ^ newflags) &&
1121 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1124 if (!S_ISDIR(tmp_la->la_mode))
1125 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1128 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1129 (la->la_valid & ~LA_FLAGS) &&
1130 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1133 /* Check for setting the obj time. */
1134 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1135 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1136 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1137 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1138 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1146 if (la->la_valid & LA_KILL_SUID) {
1147 la->la_valid &= ~LA_KILL_SUID;
1148 if ((tmp_la->la_mode & S_ISUID) &&
1149 !(la->la_valid & LA_MODE)) {
1150 la->la_mode = tmp_la->la_mode;
1151 la->la_valid |= LA_MODE;
1153 la->la_mode &= ~S_ISUID;
1156 if (la->la_valid & LA_KILL_SGID) {
1157 la->la_valid &= ~LA_KILL_SGID;
1158 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1159 (S_ISGID | S_IXGRP)) &&
1160 !(la->la_valid & LA_MODE)) {
1161 la->la_mode = tmp_la->la_mode;
1162 la->la_valid |= LA_MODE;
1164 la->la_mode &= ~S_ISGID;
1167 /* Make sure a caller can chmod. */
1168 if (la->la_valid & LA_MODE) {
1169 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1170 (uc->mu_fsuid != tmp_la->la_uid) &&
1171 !mdd_capable(uc, CFS_CAP_FOWNER))
1174 if (la->la_mode == (cfs_umode_t) -1)
1175 la->la_mode = tmp_la->la_mode;
1177 la->la_mode = (la->la_mode & S_IALLUGO) |
1178 (tmp_la->la_mode & ~S_IALLUGO);
1180 /* Also check the setgid bit! */
1181 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1182 la->la_gid : tmp_la->la_gid) &&
1183 !mdd_capable(uc, CFS_CAP_FSETID))
1184 la->la_mode &= ~S_ISGID;
1186 la->la_mode = tmp_la->la_mode;
1189 /* Make sure a caller can chown. */
1190 if (la->la_valid & LA_UID) {
1191 if (la->la_uid == (uid_t) -1)
1192 la->la_uid = tmp_la->la_uid;
1193 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1194 (la->la_uid != tmp_la->la_uid)) &&
1195 !mdd_capable(uc, CFS_CAP_CHOWN))
1198 /* If the user or group of a non-directory has been
1199 * changed by a non-root user, remove the setuid bit.
1200 * 19981026 David C Niemi <niemi@tux.org>
1202 * Changed this to apply to all users, including root,
1203 * to avoid some races. This is the behavior we had in
1204 * 2.0. The check for non-root was definitely wrong
1205 * for 2.2 anyway, as it should have been using
1206 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1207 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1208 !S_ISDIR(tmp_la->la_mode)) {
1209 la->la_mode &= ~S_ISUID;
1210 la->la_valid |= LA_MODE;
1214 /* Make sure caller can chgrp. */
1215 if (la->la_valid & LA_GID) {
1216 if (la->la_gid == (gid_t) -1)
1217 la->la_gid = tmp_la->la_gid;
1218 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1219 ((la->la_gid != tmp_la->la_gid) &&
1220 !lustre_in_group_p(uc, la->la_gid))) &&
1221 !mdd_capable(uc, CFS_CAP_CHOWN))
1224 /* Likewise, if the user or group of a non-directory
1225 * has been changed by a non-root user, remove the
1226 * setgid bit UNLESS there is no group execute bit
1227 * (this would be a file marked for mandatory
1228 * locking). 19981026 David C Niemi <niemi@tux.org>
1230 * Removed the fsuid check (see the comment above) --
1232 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1233 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1234 la->la_mode &= ~S_ISGID;
1235 la->la_valid |= LA_MODE;
1239 /* For both Size-on-MDS case and truncate case,
1240 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1241 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1242 * For SOM case, it is true, the MAY_WRITE perm has been checked
1243 * when open, no need check again. For truncate case, it is false,
1244 * the MAY_WRITE perm should be checked here. */
1245 if (ma->ma_attr_flags & MDS_SOM) {
1246 /* For the "Size-on-MDS" setattr update, merge coming
1247 * attributes with the set in the inode. BUG 10641 */
1248 if ((la->la_valid & LA_ATIME) &&
1249 (la->la_atime <= tmp_la->la_atime))
1250 la->la_valid &= ~LA_ATIME;
1252 /* OST attributes do not have a priority over MDS attributes,
1253 * so drop times if ctime is equal. */
1254 if ((la->la_valid & LA_CTIME) &&
1255 (la->la_ctime <= tmp_la->la_ctime))
1256 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1258 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1259 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1260 (uc->mu_fsuid == tmp_la->la_uid)) &&
1261 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1262 rc = mdd_permission_internal_locked(env, obj,
1269 if (la->la_valid & LA_CTIME) {
1270 /* The pure setattr, it has the priority over what is
1271 * already set, do not drop it if ctime is equal. */
1272 if (la->la_ctime < tmp_la->la_ctime)
1273 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1281 /** Store a data change changelog record
1282 * If this fails, we must fail the whole transaction; we don't
1283 * want the change to commit without the log entry.
1284 * \param mdd_obj - mdd_object of change
1285 * \param handle - transacion handle
1287 static int mdd_changelog_data_store(const struct lu_env *env,
1288 struct mdd_device *mdd,
1289 enum changelog_rec_type type,
1291 struct mdd_object *mdd_obj,
1292 struct thandle *handle)
1294 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1295 struct llog_changelog_rec *rec;
1296 struct thandle *th = NULL;
1302 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1304 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1307 LASSERT(mdd_obj != NULL);
1308 LASSERT(handle != NULL);
1310 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1311 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1312 /* Don't need multiple updates in this log */
1313 /* Don't check under lock - no big deal if we get an extra
1318 reclen = llog_data_len(sizeof(*rec));
1319 buf = mdd_buf_alloc(env, reclen);
1320 if (buf->lb_buf == NULL)
1322 rec = (struct llog_changelog_rec *)buf->lb_buf;
1324 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1325 rec->cr.cr_type = (__u32)type;
1326 rec->cr.cr_tfid = *tfid;
1327 rec->cr.cr_namelen = 0;
1328 mdd_obj->mod_cltime = cfs_time_current_64();
1330 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1333 mdd_trans_stop(env, mdd, rc, th);
1336 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1337 rc, type, PFID(tfid));
1344 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1345 int flags, struct md_object *obj)
1347 struct thandle *handle;
1348 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1349 struct mdd_device *mdd = mdo2mdd(obj);
1353 handle = mdd_trans_create(env, mdd);
1355 return(PTR_ERR(handle));
1357 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1361 rc = mdd_trans_start(env, mdd, handle);
1365 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1369 mdd_trans_stop(env, mdd, rc, handle);
1375 * Should be called with write lock held.
1377 * \see mdd_lma_set_locked().
1379 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1380 const struct md_attr *ma, struct thandle *handle)
1382 struct mdd_thread_info *info = mdd_env_info(env);
1384 struct lustre_mdt_attrs *lma =
1385 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1386 int lmasize = sizeof(struct lustre_mdt_attrs);
1391 /* Either HSM or SOM part is not valid, we need to read it before */
1392 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1393 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1397 lustre_lma_swab(lma);
1399 memset(lma, 0, lmasize);
1403 if (ma->ma_valid & MA_HSM) {
1404 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1405 lma->lma_compat |= LMAC_HSM;
1409 if (ma->ma_valid & MA_SOM) {
1410 LASSERT(ma->ma_som != NULL);
1411 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1412 lma->lma_compat &= ~LMAC_SOM;
1414 lma->lma_compat |= LMAC_SOM;
1415 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1416 lma->lma_som_size = ma->ma_som->msd_size;
1417 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1418 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1423 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1425 lustre_lma_swab(lma);
1426 buf = mdd_buf_get(env, lma, lmasize);
1427 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1433 * Save LMA extended attributes with data from \a ma.
1435 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1436 * not, LMA EA will be first read from disk, modified and write back.
1439 static int mdd_lma_set_locked(const struct lu_env *env,
1440 struct mdd_object *mdd_obj,
1441 const struct md_attr *ma, struct thandle *handle)
1445 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1446 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1447 mdd_write_unlock(env, mdd_obj);
1451 /* Precedence for choosing record type when multiple
1452 * attributes change: setattr > mtime > ctime > atime
1453 * (ctime changes when mtime does, plus chmod/chown.
1454 * atime and ctime are independent.) */
1455 static int mdd_attr_set_changelog(const struct lu_env *env,
1456 struct md_object *obj, struct thandle *handle,
1459 struct mdd_device *mdd = mdo2mdd(obj);
1462 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1463 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1464 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1465 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1466 bits = bits & mdd->mdd_cl.mc_mask;
1470 /* The record type is the lowest non-masked set bit */
1471 while (bits && ((bits & 1) == 0)) {
1476 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1477 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1478 md2mdd_obj(obj), handle);
1481 static int mdd_declare_attr_set(const struct lu_env *env,
1482 struct mdd_device *mdd,
1483 struct mdd_object *obj,
1484 const struct md_attr *ma,
1485 struct lov_mds_md *lmm,
1486 struct thandle *handle)
1488 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1491 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1495 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1499 if (ma->ma_valid & MA_LOV) {
1501 buf->lb_len = ma->ma_lmm_size;
1502 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1508 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1510 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1511 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1517 #ifdef CONFIG_FS_POSIX_ACL
1518 if (ma->ma_attr.la_valid & LA_MODE) {
1519 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1520 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1522 mdd_read_unlock(env, obj);
1523 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1531 rc = mdo_declare_xattr_set(env, obj, buf,
1532 XATTR_NAME_ACL_ACCESS, 0,
1540 /* basically the log is the same as in unlink case */
1544 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1545 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1546 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1547 mdd->mdd_obd_dev->obd_name,
1548 le32_to_cpu(lmm->lmm_magic),
1549 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1553 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1554 if (stripe == LOV_ALL_STRIPES) {
1555 struct lov_desc *ldesc;
1557 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1558 LASSERT(ldesc != NULL);
1559 stripe = ldesc->ld_tgt_count;
1562 for (i = 0; i < stripe; i++) {
1563 rc = mdd_declare_llog_record(env, mdd,
1564 sizeof(struct llog_unlink_rec),
1574 /* set attr and LOV EA at once, return updated attr */
1575 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1576 const struct md_attr *ma)
1578 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1579 struct mdd_device *mdd = mdo2mdd(obj);
1580 struct thandle *handle;
1581 struct lov_mds_md *lmm = NULL;
1582 struct llog_cookie *logcookies = NULL;
1583 int rc, lmm_size = 0, cookie_size = 0;
1584 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1585 struct obd_device *obd = mdd->mdd_obd_dev;
1586 struct mds_obd *mds = &obd->u.mds;
1587 #ifdef HAVE_QUOTA_SUPPORT
1588 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1589 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1590 int quota_opc = 0, block_count = 0;
1591 int inode_pending[MAXQUOTAS] = { 0, 0 };
1592 int block_pending[MAXQUOTAS] = { 0, 0 };
1596 *la_copy = ma->ma_attr;
1597 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1601 /* setattr on "close" only change atime, or do nothing */
1602 if (ma->ma_valid == MA_INODE &&
1603 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1606 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1607 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1608 lmm_size = mdd_lov_mdsize(env, mdd);
1609 lmm = mdd_max_lmm_get(env, mdd);
1613 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1620 handle = mdd_trans_create(env, mdd);
1622 RETURN(PTR_ERR(handle));
1624 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1625 lmm_size > 0 ? lmm : NULL, handle);
1629 /* permission changes may require sync operation */
1630 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1631 handle->th_sync = !!mdd->mdd_sync_permission;
1633 rc = mdd_trans_start(env, mdd, handle);
1637 /* permission changes may require sync operation */
1638 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1639 handle->th_sync |= mdd->mdd_sync_permission;
1641 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1642 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1643 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1645 #ifdef HAVE_QUOTA_SUPPORT
1646 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1647 struct obd_export *exp = md_quota(env)->mq_exp;
1648 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1650 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1652 quota_opc = FSFILT_OP_SETATTR;
1653 mdd_quota_wrapper(la_copy, qnids);
1654 mdd_quota_wrapper(la_tmp, qoids);
1655 /* get file quota for new owner */
1656 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1657 qnids, inode_pending, 1, NULL, 0,
1659 block_count = (la_tmp->la_blocks + 7) >> 3;
1662 mdd_data_get(env, mdd_obj, &data);
1663 /* get block quota for new owner */
1664 lquota_chkquota(mds_quota_interface_ref, obd,
1665 exp, qnids, block_pending,
1667 LQUOTA_FLAGS_BLK, data, 1);
1673 if (la_copy->la_valid & LA_FLAGS) {
1674 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1677 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1678 } else if (la_copy->la_valid) { /* setattr */
1679 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1681 /* journal chown/chgrp in llog, just like unlink */
1682 if (rc == 0 && lmm_size){
1683 cookie_size = mdd_lov_cookiesize(env, mdd);
1684 logcookies = mdd_max_cookie_get(env, mdd);
1685 if (logcookies == NULL)
1686 GOTO(cleanup, rc = -ENOMEM);
1688 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1689 logcookies, cookie_size) <= 0)
1694 if (rc == 0 && ma->ma_valid & MA_LOV) {
1697 mode = mdd_object_type(mdd_obj);
1698 if (S_ISREG(mode) || S_ISDIR(mode)) {
1699 rc = mdd_lsm_sanity_check(env, mdd_obj);
1703 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1704 ma->ma_lmm_size, handle, 1);
1708 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1711 mode = mdd_object_type(mdd_obj);
1713 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1718 rc = mdd_attr_set_changelog(env, obj, handle,
1719 ma->ma_attr.la_valid);
1721 mdd_trans_stop(env, mdd, rc, handle);
1722 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1723 /*set obd attr, if needed*/
1724 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1727 #ifdef HAVE_QUOTA_SUPPORT
1729 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1731 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1733 /* Trigger dqrel/dqacq for original owner and new owner.
1734 * If failed, the next call for lquota_chkquota will
1736 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1743 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1744 const struct lu_buf *buf, const char *name, int fl,
1745 struct thandle *handle)
1750 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1751 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1752 mdd_write_unlock(env, obj);
1757 static int mdd_xattr_sanity_check(const struct lu_env *env,
1758 struct mdd_object *obj)
1760 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1761 struct md_ucred *uc = md_ucred(env);
1765 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1768 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1772 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1773 !mdd_capable(uc, CFS_CAP_FOWNER))
1779 static int mdd_declare_xattr_set(const struct lu_env *env,
1780 struct mdd_device *mdd,
1781 struct mdd_object *obj,
1782 const struct lu_buf *buf,
1784 struct thandle *handle)
1789 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1793 /* Only record user xattr changes */
1794 if ((strncmp("user.", name, 5) == 0))
1795 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1801 * The caller should guarantee to update the object ctime
1802 * after xattr_set if needed.
1804 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1805 const struct lu_buf *buf, const char *name,
1808 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1809 struct mdd_device *mdd = mdo2mdd(obj);
1810 struct thandle *handle;
1814 rc = mdd_xattr_sanity_check(env, mdd_obj);
1818 handle = mdd_trans_create(env, mdd);
1820 RETURN(PTR_ERR(handle));
1822 /* security-replated changes may require sync */
1823 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1824 mdd->mdd_sync_permission == 1)
1825 handle->th_sync = 1;
1827 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1831 rc = mdd_trans_start(env, mdd, handle);
1835 /* security-replated changes may require sync */
1836 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1837 handle->th_sync |= mdd->mdd_sync_permission;
1839 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1841 /* Only record system & user xattr changes */
1842 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1843 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1844 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1845 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1846 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1847 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1848 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1852 mdd_trans_stop(env, mdd, rc, handle);
1857 static int mdd_declare_xattr_del(const struct lu_env *env,
1858 struct mdd_device *mdd,
1859 struct mdd_object *obj,
1861 struct thandle *handle)
1865 rc = mdo_declare_xattr_del(env, obj, name, handle);
1869 /* Only record user xattr changes */
1870 if ((strncmp("user.", name, 5) == 0))
1871 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1877 * The caller should guarantee to update the object ctime
1878 * after xattr_set if needed.
1880 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1883 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1884 struct mdd_device *mdd = mdo2mdd(obj);
1885 struct thandle *handle;
1889 rc = mdd_xattr_sanity_check(env, mdd_obj);
1893 handle = mdd_trans_create(env, mdd);
1895 RETURN(PTR_ERR(handle));
1897 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1901 rc = mdd_trans_start(env, mdd, handle);
1905 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1906 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1907 mdd_object_capa(env, mdd_obj));
1908 mdd_write_unlock(env, mdd_obj);
1910 /* Only record system & user xattr changes */
1911 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1912 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1913 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1914 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1915 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1916 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1917 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1921 mdd_trans_stop(env, mdd, rc, handle);
1926 /* partial unlink */
1927 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1930 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1931 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1932 struct mdd_device *mdd = mdo2mdd(obj);
1933 struct thandle *handle;
1934 #ifdef HAVE_QUOTA_SUPPORT
1935 struct obd_device *obd = mdd->mdd_obd_dev;
1936 struct mds_obd *mds = &obd->u.mds;
1937 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1943 /* XXX: this code won't be used ever:
1944 * DNE uses slightly different approach */
1948 * Check -ENOENT early here because we need to get object type
1949 * to calculate credits before transaction start
1951 if (!mdd_object_exists(mdd_obj))
1954 LASSERT(mdd_object_exists(mdd_obj) > 0);
1956 handle = mdd_trans_create(env, mdd);
1960 rc = mdd_trans_start(env, mdd, handle);
1962 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1964 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1968 __mdd_ref_del(env, mdd_obj, handle, 0);
1970 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1972 __mdd_ref_del(env, mdd_obj, handle, 1);
1975 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1976 la_copy->la_ctime = ma->ma_attr.la_ctime;
1978 la_copy->la_valid = LA_CTIME;
1979 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1983 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1984 #ifdef HAVE_QUOTA_SUPPORT
1985 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1986 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1987 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1988 mdd_quota_wrapper(&ma->ma_attr, qids);
1995 mdd_write_unlock(env, mdd_obj);
1996 mdd_trans_stop(env, mdd, rc, handle);
1997 #ifdef HAVE_QUOTA_SUPPORT
1999 /* Trigger dqrel on the owner of child. If failed,
2000 * the next call for lquota_chkquota will process it */
2001 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2007 /* partial operation */
2008 static int mdd_oc_sanity_check(const struct lu_env *env,
2009 struct mdd_object *obj,
2015 switch (ma->ma_attr.la_mode & S_IFMT) {
2032 static int mdd_object_create(const struct lu_env *env,
2033 struct md_object *obj,
2034 const struct md_op_spec *spec,
2038 struct mdd_device *mdd = mdo2mdd(obj);
2039 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2040 const struct lu_fid *pfid = spec->u.sp_pfid;
2041 struct thandle *handle;
2042 #ifdef HAVE_QUOTA_SUPPORT
2043 struct obd_device *obd = mdd->mdd_obd_dev;
2044 struct obd_export *exp = md_quota(env)->mq_exp;
2045 struct mds_obd *mds = &obd->u.mds;
2046 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2047 int quota_opc = 0, block_count = 0;
2048 int inode_pending[MAXQUOTAS] = { 0, 0 };
2049 int block_pending[MAXQUOTAS] = { 0, 0 };
2054 /* XXX: this code won't be used ever:
2055 * DNE uses slightly different approach */
2058 #ifdef HAVE_QUOTA_SUPPORT
2059 if (mds->mds_quota) {
2060 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2061 mdd_quota_wrapper(&ma->ma_attr, qids);
2062 /* get file quota for child */
2063 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2064 qids, inode_pending, 1, NULL, 0,
2066 switch (ma->ma_attr.la_mode & S_IFMT) {
2075 /* get block quota for child */
2077 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2078 qids, block_pending, block_count,
2079 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2083 handle = mdd_trans_create(env, mdd);
2085 GOTO(out_pending, rc = PTR_ERR(handle));
2087 rc = mdd_trans_start(env, mdd, handle);
2089 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2090 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2094 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2098 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2099 /* If creating the slave object, set slave EA here. */
2100 int lmv_size = spec->u.sp_ea.eadatalen;
2101 struct lmv_stripe_md *lmv;
2103 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2104 LASSERT(lmv != NULL && lmv_size > 0);
2106 rc = __mdd_xattr_set(env, mdd_obj,
2107 mdd_buf_get_const(env, lmv, lmv_size),
2108 XATTR_NAME_LMV, 0, handle);
2112 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2115 #ifdef CONFIG_FS_POSIX_ACL
2116 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2117 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2119 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2120 buf->lb_len = spec->u.sp_ea.eadatalen;
2121 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2122 rc = __mdd_acl_init(env, mdd_obj, buf,
2123 &ma->ma_attr.la_mode,
2128 ma->ma_attr.la_valid |= LA_MODE;
2131 pfid = spec->u.sp_ea.fid;
2134 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2140 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2141 mdd_write_unlock(env, mdd_obj);
2143 mdd_trans_stop(env, mdd, rc, handle);
2145 #ifdef HAVE_QUOTA_SUPPORT
2147 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2149 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2151 /* Trigger dqacq on the owner of child. If failed,
2152 * the next call for lquota_chkquota will process it. */
2153 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2161 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2162 const struct md_attr *ma)
2164 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2165 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2166 struct mdd_device *mdd = mdo2mdd(obj);
2167 struct thandle *handle;
2171 /* XXX: this code won't be used ever:
2172 * DNE uses slightly different approach */
2175 handle = mdd_trans_create(env, mdd);
2179 rc = mdd_trans_start(env, mdd, handle);
2181 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2182 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2184 __mdd_ref_add(env, mdd_obj, handle);
2185 mdd_write_unlock(env, mdd_obj);
2187 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2188 la_copy->la_ctime = ma->ma_attr.la_ctime;
2190 la_copy->la_valid = LA_CTIME;
2191 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2194 mdd_trans_stop(env, mdd, 0, handle);
2200 * do NOT or the MAY_*'s, you'll get the weakest
2202 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2206 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2207 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2208 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2209 * owner can write to a file even if it is marked readonly to hide
2210 * its brokenness. (bug 5781) */
2211 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2212 struct md_ucred *uc = md_ucred(env);
2214 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2215 (la->la_uid == uc->mu_fsuid))
2219 if (flags & FMODE_READ)
2221 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2223 if (flags & MDS_FMODE_EXEC)
2228 static int mdd_open_sanity_check(const struct lu_env *env,
2229 struct mdd_object *obj, int flag)
2231 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2236 if (mdd_is_dead_obj(obj))
2239 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2243 if (S_ISLNK(tmp_la->la_mode))
2246 mode = accmode(env, tmp_la, flag);
2248 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2251 if (!(flag & MDS_OPEN_CREATED)) {
2252 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2257 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2258 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2259 flag &= ~MDS_OPEN_TRUNC;
2261 /* For writing append-only file must open it with append mode. */
2262 if (mdd_is_append(obj)) {
2263 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2265 if (flag & MDS_OPEN_TRUNC)
2271 * Now, flag -- O_NOATIME does not be packed by client.
2273 if (flag & O_NOATIME) {
2274 struct md_ucred *uc = md_ucred(env);
2276 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2277 (uc->mu_valid == UCRED_NEW)) &&
2278 (uc->mu_fsuid != tmp_la->la_uid) &&
2279 !mdd_capable(uc, CFS_CAP_FOWNER))
2287 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2290 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2293 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2295 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2297 mdd_obj->mod_count++;
2299 mdd_write_unlock(env, mdd_obj);
2303 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2304 struct md_attr *ma, struct thandle *handle)
2308 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2312 return mdo_declare_destroy(env, obj, handle);
2315 /* return md_attr back,
2316 * if it is last unlink then return lov ea + llog cookie*/
2317 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2318 struct md_attr *ma, struct thandle *handle)
2323 if (S_ISREG(mdd_object_type(obj))) {
2324 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2325 * Caller must be ready for that. */
2327 rc = __mdd_lmm_get(env, obj, ma);
2328 if ((ma->ma_valid & MA_LOV))
2329 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2334 rc = mdo_destroy(env, obj, handle);
2339 static int mdd_declare_close(const struct lu_env *env,
2340 struct mdd_object *obj,
2342 struct thandle *handle)
2346 rc = orph_declare_index_delete(env, obj, handle);
2350 return mdd_declare_object_kill(env, obj, ma, handle);
2354 * No permission check is needed.
2356 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2357 struct md_attr *ma, int mode)
2359 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2360 struct mdd_device *mdd = mdo2mdd(obj);
2361 struct thandle *handle = NULL;
2365 #ifdef HAVE_QUOTA_SUPPORT
2366 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2367 struct mds_obd *mds = &obd->u.mds;
2368 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2373 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2374 mdd_obj->mod_count--;
2376 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2377 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2378 "list\n", PFID(mdd_object_fid(mdd_obj)));
2382 /* check without any lock */
2383 if (mdd_obj->mod_count == 1 &&
2384 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2386 handle = mdd_trans_create(env, mdo2mdd(obj));
2388 RETURN(PTR_ERR(handle));
2390 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2394 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2398 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2403 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2404 if (handle == NULL && mdd_obj->mod_count == 1 &&
2405 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2406 mdd_write_unlock(env, mdd_obj);
2410 /* release open count */
2411 mdd_obj->mod_count --;
2413 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2414 /* remove link to object from orphan index */
2415 LASSERT(handle != NULL);
2416 rc = __mdd_orphan_del(env, mdd_obj, handle);
2418 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2419 "list, OSS objects to be destroyed.\n",
2420 PFID(mdd_object_fid(mdd_obj)));
2422 CERROR("Object "DFID" can not be deleted from orphan "
2423 "list, maybe cause OST objects can not be "
2424 "destroyed (err: %d).\n",
2425 PFID(mdd_object_fid(mdd_obj)), rc);
2426 /* If object was not deleted from orphan list, do not
2427 * destroy OSS objects, which will be done when next
2433 rc = mdd_iattr_get(env, mdd_obj, ma);
2434 /* Object maybe not in orphan list originally, it is rare case for
2435 * mdd_finish_unlink() failure. */
2436 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2437 #ifdef HAVE_QUOTA_SUPPORT
2438 if (mds->mds_quota) {
2439 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2440 mdd_quota_wrapper(&ma->ma_attr, qids);
2443 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2444 if (ma->ma_valid & MA_FLAGS &&
2445 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2446 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2448 if (handle == NULL) {
2449 handle = mdd_trans_create(env, mdo2mdd(obj));
2451 GOTO(out, rc = PTR_ERR(handle));
2453 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2458 rc = mdd_declare_changelog_store(env, mdd,
2463 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2468 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2474 CERROR("Error when prepare to delete Object "DFID" , "
2475 "which will cause OST objects can not be "
2476 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2482 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2484 mdd_write_unlock(env, mdd_obj);
2487 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2488 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2489 if (handle == NULL) {
2490 handle = mdd_trans_create(env, mdo2mdd(obj));
2492 GOTO(stop, rc = IS_ERR(handle));
2494 rc = mdd_declare_changelog_store(env, mdd, NULL,
2499 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2504 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2510 mdd_trans_stop(env, mdd, rc, handle);
2511 #ifdef HAVE_QUOTA_SUPPORT
2513 /* Trigger dqrel on the owner of child. If failed,
2514 * the next call for lquota_chkquota will process it */
2515 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2522 * Permission check is done when open,
2523 * no need check again.
2525 static int mdd_readpage_sanity_check(const struct lu_env *env,
2526 struct mdd_object *obj)
2528 struct dt_object *next = mdd_object_child(obj);
2532 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2540 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2541 struct lu_dirpage *dp, int nob,
2542 const struct dt_it_ops *iops, struct dt_it *it,
2548 struct lu_dirent *ent;
2549 struct lu_dirent *last = NULL;
2552 memset(area, 0, sizeof (*dp));
2553 area += sizeof (*dp);
2554 nob -= sizeof (*dp);
2561 len = iops->key_size(env, it);
2563 /* IAM iterator can return record with zero len. */
2567 hash = iops->store(env, it);
2568 if (unlikely(first)) {
2570 dp->ldp_hash_start = cpu_to_le64(hash);
2573 /* calculate max space required for lu_dirent */
2574 recsize = lu_dirent_calc_size(len, attr);
2576 if (nob >= recsize) {
2577 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2578 if (result == -ESTALE)
2583 /* osd might not able to pack all attributes,
2584 * so recheck rec length */
2585 recsize = le16_to_cpu(ent->lde_reclen);
2587 result = (last != NULL) ? 0 :-EINVAL;
2591 ent = (void *)ent + recsize;
2595 result = iops->next(env, it);
2596 if (result == -ESTALE)
2598 } while (result == 0);
2601 dp->ldp_hash_end = cpu_to_le64(hash);
2603 if (last->lde_hash == dp->ldp_hash_end)
2604 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2605 last->lde_reclen = 0; /* end mark */
2610 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2611 const struct lu_rdpg *rdpg)
2614 struct dt_object *next = mdd_object_child(obj);
2615 const struct dt_it_ops *iops;
2617 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2623 LASSERT(rdpg->rp_pages != NULL);
2624 LASSERT(next->do_index_ops != NULL);
2626 if (rdpg->rp_count <= 0)
2630 * iterate through directory and fill pages from @rdpg
2632 iops = &next->do_index_ops->dio_it;
2633 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2637 rc = iops->load(env, it, rdpg->rp_hash);
2641 * Iterator didn't find record with exactly the key requested.
2643 * It is currently either
2645 * - positioned above record with key less than
2646 * requested---skip it.
2648 * - or not positioned at all (is in IAM_IT_SKEWED
2649 * state)---position it on the next item.
2651 rc = iops->next(env, it);
2656 * At this point and across for-loop:
2658 * rc == 0 -> ok, proceed.
2659 * rc > 0 -> end of directory.
2662 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2663 i++, nob -= CFS_PAGE_SIZE) {
2664 struct lu_dirpage *dp;
2666 LASSERT(i < rdpg->rp_npages);
2667 pg = rdpg->rp_pages[i];
2669 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2672 rc = mdd_dir_page_build(env, mdd, dp,
2673 min_t(int, nob, LU_PAGE_SIZE),
2674 iops, it, rdpg->rp_attrs);
2679 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2681 } else if (rc < 0) {
2682 CWARN("build page failed: %d!\n", rc);
2685 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2686 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2687 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2694 struct lu_dirpage *dp;
2696 dp = cfs_kmap(rdpg->rp_pages[0]);
2697 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2700 * No pages were processed, mark this for first page
2703 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2706 cfs_kunmap(rdpg->rp_pages[0]);
2708 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2711 iops->fini(env, it);
2716 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2717 const struct lu_rdpg *rdpg)
2719 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2723 LASSERT(mdd_object_exists(mdd_obj));
2725 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2726 rc = mdd_readpage_sanity_check(env, mdd_obj);
2728 GOTO(out_unlock, rc);
2730 if (mdd_is_dead_obj(mdd_obj)) {
2732 struct lu_dirpage *dp;
2735 * According to POSIX, please do not return any entry to client:
2736 * even dot and dotdot should not be returned.
2738 CWARN("readdir from dead object: "DFID"\n",
2739 PFID(mdd_object_fid(mdd_obj)));
2741 if (rdpg->rp_count <= 0)
2742 GOTO(out_unlock, rc = -EFAULT);
2743 LASSERT(rdpg->rp_pages != NULL);
2745 pg = rdpg->rp_pages[0];
2746 dp = (struct lu_dirpage*)cfs_kmap(pg);
2747 memset(dp, 0 , sizeof(struct lu_dirpage));
2748 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2749 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2750 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2752 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2755 rc = __mdd_readpage(env, mdd_obj, rdpg);
2759 mdd_read_unlock(env, mdd_obj);
2763 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2765 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2766 struct dt_object *next;
2768 LASSERT(mdd_object_exists(mdd_obj));
2769 next = mdd_object_child(mdd_obj);
2770 return next->do_ops->do_object_sync(env, next);
2773 const struct md_object_operations mdd_obj_ops = {
2774 .moo_permission = mdd_permission,
2775 .moo_attr_get = mdd_attr_get,
2776 .moo_attr_set = mdd_attr_set,
2777 .moo_xattr_get = mdd_xattr_get,
2778 .moo_xattr_set = mdd_xattr_set,
2779 .moo_xattr_list = mdd_xattr_list,
2780 .moo_xattr_del = mdd_xattr_del,
2781 .moo_object_create = mdd_object_create,
2782 .moo_ref_add = mdd_ref_add,
2783 .moo_ref_del = mdd_ref_del,
2784 .moo_open = mdd_open,
2785 .moo_close = mdd_close,
2786 .moo_readpage = mdd_readpage,
2787 .moo_readlink = mdd_readlink,
2788 .moo_changelog = mdd_changelog,
2789 .moo_capa_get = mdd_capa_get,
2790 .moo_object_sync = mdd_object_sync,
2791 .moo_path = mdd_path,
2792 .moo_file_lock = mdd_file_lock,
2793 .moo_file_unlock = mdd_file_unlock,