1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/mdd/mdd_object.c
40 * Lustre Metadata Server (mdd) routines
42 * Author: Wang Di <wangdi@clusterfs.com>
46 # define EXPORT_SYMTAB
48 #define DEBUG_SUBSYSTEM S_MDS
50 #include <linux/module.h>
52 #include <obd_class.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55 /* fid_be_cpu(), fid_cpu_to_be(). */
56 #include <lustre_fid.h>
59 #include <lustre_param.h>
60 #include <lustre_mds.h>
61 #include <lustre/lustre_idl.h>
63 #include "mdd_internal.h"
65 static const struct lu_object_operations mdd_lu_obj_ops;
67 static int mdd_xattr_get(const struct lu_env *env,
68 struct md_object *obj, struct lu_buf *buf,
71 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
74 if (mdd_object_exists(obj) == 0) {
75 CERROR("%s: object "DFID" not found: rc = -2\n",
76 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
79 mdo_data_get(env, obj, data);
83 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
84 struct lu_attr *la, struct lustre_capa *capa)
86 if (mdd_object_exists(obj) == 0) {
87 CERROR("%s: object "DFID" not found: rc = -2\n",
88 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
91 return mdo_attr_get(env, obj, la, capa);
94 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98 if (flags & LUSTRE_APPEND_FL)
99 obj->mod_flags |= APPEND_OBJ;
101 if (flags & LUSTRE_IMMUTABLE_FL)
102 obj->mod_flags |= IMMUTE_OBJ;
105 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 struct mdd_thread_info *info;
109 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
110 LASSERT(info != NULL);
114 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
118 buf = &mdd_env_info(env)->mti_buf;
124 void mdd_buf_put(struct lu_buf *buf)
126 if (buf == NULL || buf->lb_buf == NULL)
128 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
133 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
134 const void *area, ssize_t len)
138 buf = &mdd_env_info(env)->mti_buf;
139 buf->lb_buf = (void *)area;
144 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
146 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
148 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
149 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
152 if (buf->lb_buf == NULL) {
154 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
155 if (buf->lb_buf == NULL)
161 /** Increase the size of the \a mti_big_buf.
162 * preserves old data in buffer
163 * old buffer remains unchanged on error
164 * \retval 0 or -ENOMEM
166 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
168 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
171 LASSERT(len >= oldbuf->lb_len);
172 OBD_ALLOC_LARGE(buf.lb_buf, len);
174 if (buf.lb_buf == NULL)
178 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
180 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
182 memcpy(oldbuf, &buf, sizeof(buf));
187 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
188 struct mdd_device *mdd)
190 struct mdd_thread_info *mti = mdd_env_info(env);
193 max_cookie_size = mdd_lov_cookiesize(env, mdd);
194 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
195 if (mti->mti_max_cookie)
196 OBD_FREE_LARGE(mti->mti_max_cookie,
197 mti->mti_max_cookie_size);
198 mti->mti_max_cookie = NULL;
199 mti->mti_max_cookie_size = 0;
201 if (unlikely(mti->mti_max_cookie == NULL)) {
202 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
203 if (likely(mti->mti_max_cookie != NULL))
204 mti->mti_max_cookie_size = max_cookie_size;
206 if (likely(mti->mti_max_cookie != NULL))
207 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
208 return mti->mti_max_cookie;
211 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
213 struct mdd_thread_info *mti = mdd_env_info(env);
215 if (unlikely(mti->mti_max_lmm_size < size)) {
216 int rsize = size_roundup_power2(size);
218 if (mti->mti_max_lmm_size > 0) {
219 LASSERT(mti->mti_max_lmm);
220 OBD_FREE_LARGE(mti->mti_max_lmm,
221 mti->mti_max_lmm_size);
222 mti->mti_max_lmm = NULL;
223 mti->mti_max_lmm_size = 0;
226 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
227 if (likely(mti->mti_max_lmm != NULL))
228 mti->mti_max_lmm_size = rsize;
230 return mti->mti_max_lmm;
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234 struct mdd_device *mdd)
238 max_lmm_size = mdd_lov_mdsize(env, mdd);
239 return mdd_max_lmm_buffer(env, max_lmm_size);
242 struct lu_object *mdd_object_alloc(const struct lu_env *env,
243 const struct lu_object_header *hdr,
246 struct mdd_object *mdd_obj;
248 OBD_ALLOC_PTR(mdd_obj);
249 if (mdd_obj != NULL) {
252 o = mdd2lu_obj(mdd_obj);
253 lu_object_init(o, NULL, d);
254 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
255 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
256 mdd_obj->mod_count = 0;
257 o->lo_ops = &mdd_lu_obj_ops;
264 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
265 const struct lu_object_conf *unused)
267 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
268 struct mdd_object *mdd_obj = lu2mdd_obj(o);
269 struct lu_object *below;
270 struct lu_device *under;
273 mdd_obj->mod_cltime = 0;
274 under = &d->mdd_child->dd_lu_dev;
275 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
276 mdd_pdlock_init(mdd_obj);
280 lu_object_add(o, below);
285 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
287 if (lu_object_exists(o))
288 return mdd_get_flags(env, lu2mdd_obj(o));
293 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
295 struct mdd_object *mdd = lu2mdd_obj(o);
301 static int mdd_object_print(const struct lu_env *env, void *cookie,
302 lu_printer_t p, const struct lu_object *o)
304 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
305 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
306 "valid=%x, cltime="LPU64", flags=%lx)",
307 mdd, mdd->mod_count, mdd->mod_valid,
308 mdd->mod_cltime, mdd->mod_flags);
311 static const struct lu_object_operations mdd_lu_obj_ops = {
312 .loo_object_init = mdd_object_init,
313 .loo_object_start = mdd_object_start,
314 .loo_object_free = mdd_object_free,
315 .loo_object_print = mdd_object_print,
318 struct mdd_object *mdd_object_find(const struct lu_env *env,
319 struct mdd_device *d,
320 const struct lu_fid *f)
322 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
325 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
326 const char *path, struct lu_fid *fid)
329 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
330 struct mdd_object *obj;
331 struct lu_name *lname = &mdd_env_info(env)->mti_name;
336 /* temp buffer for path element */
337 buf = mdd_buf_alloc(env, PATH_MAX);
338 if (buf->lb_buf == NULL)
341 lname->ln_name = name = buf->lb_buf;
342 lname->ln_namelen = 0;
343 *f = mdd->mdd_root_fid;
350 while (*path != '/' && *path != '\0') {
358 /* find obj corresponding to fid */
359 obj = mdd_object_find(env, mdd, f);
361 GOTO(out, rc = -EREMOTE);
363 GOTO(out, rc = PTR_ERR(obj));
364 /* get child fid from parent and name */
365 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
366 mdd_object_put(env, obj);
371 lname->ln_namelen = 0;
380 /** The maximum depth that fid2path() will search.
381 * This is limited only because we want to store the fids for
382 * historical path lookup purposes.
384 #define MAX_PATH_DEPTH 100
386 /** mdd_path() lookup structure. */
387 struct path_lookup_info {
388 __u64 pli_recno; /**< history point */
389 __u64 pli_currec; /**< current record */
390 struct lu_fid pli_fid;
391 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
392 struct mdd_object *pli_mdd_obj;
393 char *pli_path; /**< full path */
395 int pli_linkno; /**< which hardlink to follow */
396 int pli_fidcount; /**< number of \a pli_fids */
399 static int mdd_path_current(const struct lu_env *env,
400 struct path_lookup_info *pli)
402 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
403 struct mdd_object *mdd_obj;
404 struct lu_buf *buf = NULL;
405 struct link_ea_header *leh;
406 struct link_ea_entry *lee;
407 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
408 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
414 ptr = pli->pli_path + pli->pli_pathlen - 1;
417 pli->pli_fidcount = 0;
418 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
420 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
421 mdd_obj = mdd_object_find(env, mdd,
422 &pli->pli_fids[pli->pli_fidcount]);
424 GOTO(out, rc = -EREMOTE);
426 GOTO(out, rc = PTR_ERR(mdd_obj));
427 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
429 mdd_object_put(env, mdd_obj);
433 /* Do I need to error out here? */
438 /* Get parent fid and object name */
439 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
440 buf = mdd_links_get(env, mdd_obj);
441 mdd_read_unlock(env, mdd_obj);
442 mdd_object_put(env, mdd_obj);
444 GOTO(out, rc = PTR_ERR(buf));
447 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
448 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
450 /* If set, use link #linkno for path lookup, otherwise use
451 link #0. Only do this for the final path element. */
452 if ((pli->pli_fidcount == 0) &&
453 (pli->pli_linkno < leh->leh_reccount)) {
455 for (count = 0; count < pli->pli_linkno; count++) {
456 lee = (struct link_ea_entry *)
457 ((char *)lee + reclen);
458 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
460 if (pli->pli_linkno < leh->leh_reccount - 1)
461 /* indicate to user there are more links */
465 /* Pack the name in the end of the buffer */
466 ptr -= tmpname->ln_namelen;
467 if (ptr - 1 <= pli->pli_path)
468 GOTO(out, rc = -EOVERFLOW);
469 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
472 /* Store the parent fid for historic lookup */
473 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
474 GOTO(out, rc = -EOVERFLOW);
475 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
478 /* Verify that our path hasn't changed since we started the lookup.
479 Record the current index, and verify the path resolves to the
480 same fid. If it does, then the path is correct as of this index. */
481 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
482 pli->pli_currec = mdd->mdd_cl.mc_index;
483 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
484 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
486 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
487 GOTO (out, rc = -EAGAIN);
489 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
490 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
491 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
492 PFID(&pli->pli_fid));
493 GOTO(out, rc = -EAGAIN);
495 ptr++; /* skip leading / */
496 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
500 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
501 /* if we vmalloced a large buffer drop it */
507 static int mdd_path_historic(const struct lu_env *env,
508 struct path_lookup_info *pli)
513 /* Returns the full path to this fid, as of changelog record recno. */
514 static int mdd_path(const struct lu_env *env, struct md_object *obj,
515 char *path, int pathlen, __u64 *recno, int *linkno)
517 struct path_lookup_info *pli;
525 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
534 pli->pli_mdd_obj = md2mdd_obj(obj);
535 pli->pli_recno = *recno;
536 pli->pli_path = path;
537 pli->pli_pathlen = pathlen;
538 pli->pli_linkno = *linkno;
540 /* Retry multiple times in case file is being moved */
541 while (tries-- && rc == -EAGAIN)
542 rc = mdd_path_current(env, pli);
544 /* For historical path lookup, the current links may not have existed
545 * at "recno" time. We must switch over to earlier links/parents
546 * by using the changelog records. If the earlier parent doesn't
547 * exist, we must search back through the changelog to reconstruct
548 * its parents, then check if it exists, etc.
549 * We may ignore this problem for the initial implementation and
550 * state that an "original" hardlink must still exist for us to find
551 * historic path name. */
552 if (pli->pli_recno != -1) {
553 rc = mdd_path_historic(env, pli);
555 *recno = pli->pli_currec;
556 /* Return next link index to caller */
557 *linkno = pli->pli_linkno;
565 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
567 struct lu_attr *la = &mdd_env_info(env)->mti_la;
571 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
573 mdd_flags_xlate(obj, la->la_flags);
578 /* get only inode attributes */
579 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
585 if (ma->ma_valid & MA_INODE)
588 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
589 mdd_object_capa(env, mdd_obj));
591 ma->ma_valid |= MA_INODE;
595 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
597 struct lov_desc *ldesc;
598 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
599 struct lov_user_md *lum = (struct lov_user_md*)lmm;
605 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
606 LASSERT(ldesc != NULL);
608 lum->lmm_magic = LOV_MAGIC_V1;
609 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
610 lum->lmm_pattern = ldesc->ld_pattern;
611 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
612 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
613 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
615 RETURN(sizeof(*lum));
618 static int is_rootdir(struct mdd_object *mdd_obj)
620 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
621 const struct lu_fid *fid = mdo2fid(mdd_obj);
623 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
626 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
629 struct mdd_thread_info *info = mdd_env_info(env);
634 LASSERT(info != NULL);
635 LASSERT(ma->ma_lmm_size > 0);
636 LASSERT(ma->ma_big_lmm_used == 0);
638 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
639 mdd_object_capa(env, obj));
643 /* big_lmm may need to grow */
645 mdd_max_lmm_buffer(env, size);
646 if (info->mti_max_lmm == NULL)
649 LASSERT(info->mti_max_lmm_size >= size);
650 rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
655 ma->ma_big_lmm_used = 1;
656 ma->ma_valid |= MA_LOV;
657 ma->ma_lmm = info->mti_max_lmm;
658 ma->ma_lmm_size = size;
663 /* get lov EA only */
664 static int __mdd_lmm_get(const struct lu_env *env,
665 struct mdd_object *mdd_obj, struct md_attr *ma)
670 if (ma->ma_valid & MA_LOV)
673 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
676 rc = mdd_big_lmm_get(env, mdd_obj, ma);
677 else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
678 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
681 ma->ma_lmm_size = rc;
682 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
683 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
689 /* get the first parent fid from link EA */
690 static int mdd_pfid_get(const struct lu_env *env,
691 struct mdd_object *mdd_obj, struct md_attr *ma)
694 struct link_ea_header *leh;
695 struct link_ea_entry *lee;
696 struct lu_fid *pfid = &ma->ma_pfid;
699 if (ma->ma_valid & MA_PFID)
702 buf = mdd_links_get(env, mdd_obj);
704 RETURN(PTR_ERR(buf));
707 lee = (struct link_ea_entry *)(leh + 1);
708 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
709 fid_be_to_cpu(pfid, pfid);
710 ma->ma_valid |= MA_PFID;
711 if (buf->lb_len > OBD_ALLOC_BIG)
712 /* if we vmalloced a large buffer drop it */
717 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
723 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
724 rc = __mdd_lmm_get(env, mdd_obj, ma);
725 mdd_read_unlock(env, mdd_obj);
730 static int __mdd_lmv_get(const struct lu_env *env,
731 struct mdd_object *mdd_obj, struct md_attr *ma)
736 if (ma->ma_valid & MA_LMV)
739 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
742 ma->ma_valid |= MA_LMV;
748 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
751 struct mdd_thread_info *info = mdd_env_info(env);
752 struct lustre_mdt_attrs *lma =
753 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
758 /* If all needed data are already valid, nothing to do */
759 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
760 (ma->ma_need & (MA_HSM | MA_SOM)))
763 /* Read LMA from disk EA */
764 lma_size = sizeof(info->mti_xattr_buf);
765 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
769 /* Useless to check LMA incompatibility because this is already done in
770 * osd_ea_fid_get(), and this will fail long before this code is
772 * So, if we are here, LMA is compatible.
775 lustre_lma_swab(lma);
777 /* Swab and copy LMA */
778 if (ma->ma_need & MA_HSM) {
779 if (lma->lma_compat & LMAC_HSM)
780 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
782 ma->ma_hsm.mh_flags = 0;
783 ma->ma_valid |= MA_HSM;
787 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
788 LASSERT(ma->ma_som != NULL);
789 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
790 ma->ma_som->msd_size = lma->lma_som_size;
791 ma->ma_som->msd_blocks = lma->lma_som_blocks;
792 ma->ma_som->msd_mountid = lma->lma_som_mountid;
793 ma->ma_valid |= MA_SOM;
799 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
805 if (ma->ma_need & MA_INODE)
806 rc = mdd_iattr_get(env, mdd_obj, ma);
808 if (rc == 0 && ma->ma_need & MA_LOV) {
809 if (S_ISREG(mdd_object_type(mdd_obj)) ||
810 S_ISDIR(mdd_object_type(mdd_obj)))
811 rc = __mdd_lmm_get(env, mdd_obj, ma);
813 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
814 if (S_ISREG(mdd_object_type(mdd_obj)))
815 rc = mdd_pfid_get(env, mdd_obj, ma);
817 if (rc == 0 && ma->ma_need & MA_LMV) {
818 if (S_ISDIR(mdd_object_type(mdd_obj)))
819 rc = __mdd_lmv_get(env, mdd_obj, ma);
821 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
822 if (S_ISREG(mdd_object_type(mdd_obj)))
823 rc = __mdd_lma_get(env, mdd_obj, ma);
825 #ifdef CONFIG_FS_POSIX_ACL
826 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
827 if (S_ISDIR(mdd_object_type(mdd_obj)))
828 rc = mdd_def_acl_get(env, mdd_obj, ma);
831 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
832 rc, ma->ma_valid, ma->ma_lmm);
836 int mdd_attr_get_internal_locked(const struct lu_env *env,
837 struct mdd_object *mdd_obj, struct md_attr *ma)
840 int needlock = ma->ma_need &
841 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
844 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
845 rc = mdd_attr_get_internal(env, mdd_obj, ma);
847 mdd_read_unlock(env, mdd_obj);
852 * No permission check is needed.
854 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
857 struct mdd_object *mdd_obj = md2mdd_obj(obj);
861 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
866 * No permission check is needed.
868 static int mdd_xattr_get(const struct lu_env *env,
869 struct md_object *obj, struct lu_buf *buf,
872 struct mdd_object *mdd_obj = md2mdd_obj(obj);
877 if (mdd_object_exists(mdd_obj) == 0) {
878 CERROR("%s: object "DFID" not found: rc = -2\n",
879 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
883 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
884 rc = mdo_xattr_get(env, mdd_obj, buf, name,
885 mdd_object_capa(env, mdd_obj));
886 mdd_read_unlock(env, mdd_obj);
892 * Permission check is done when open,
893 * no need check again.
895 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
898 struct mdd_object *mdd_obj = md2mdd_obj(obj);
899 struct dt_object *next;
904 if (mdd_object_exists(mdd_obj) == 0) {
905 CERROR("%s: object "DFID" not found: rc = -2\n",
906 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
910 next = mdd_object_child(mdd_obj);
911 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
912 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
913 mdd_object_capa(env, mdd_obj));
914 mdd_read_unlock(env, mdd_obj);
919 * No permission check is needed.
921 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
924 struct mdd_object *mdd_obj = md2mdd_obj(obj);
929 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
930 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
931 mdd_read_unlock(env, mdd_obj);
936 int mdd_declare_object_create_internal(const struct lu_env *env,
937 struct mdd_object *p,
938 struct mdd_object *c,
940 struct thandle *handle,
941 const struct md_op_spec *spec)
943 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
944 const struct dt_index_features *feat = spec->sp_feat;
948 if (feat != &dt_directory_features && feat != NULL)
949 dof->dof_type = DFT_INDEX;
951 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
953 dof->u.dof_idx.di_feat = feat;
955 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
960 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
961 struct mdd_object *c, struct md_attr *ma,
962 struct thandle *handle,
963 const struct md_op_spec *spec)
965 struct lu_attr *attr = &ma->ma_attr;
966 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
967 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
968 const struct dt_index_features *feat = spec->sp_feat;
972 if (!mdd_object_exists(c)) {
973 struct dt_object *next = mdd_object_child(c);
976 if (feat != &dt_directory_features && feat != NULL)
977 dof->dof_type = DFT_INDEX;
979 dof->dof_type = dt_mode_to_dft(attr->la_mode);
981 dof->u.dof_idx.di_feat = feat;
983 /* @hint will be initialized by underlying device. */
984 next->do_ops->do_ah_init(env, hint,
985 p ? mdd_object_child(p) : NULL,
986 attr->la_mode & S_IFMT);
988 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
989 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
997 * Make sure the ctime is increased only.
999 static inline int mdd_attr_check(const struct lu_env *env,
1000 struct mdd_object *obj,
1001 struct lu_attr *attr)
1003 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1007 if (attr->la_valid & LA_CTIME) {
1008 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1012 if (attr->la_ctime < tmp_la->la_ctime)
1013 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1014 else if (attr->la_valid == LA_CTIME &&
1015 attr->la_ctime == tmp_la->la_ctime)
1016 attr->la_valid &= ~LA_CTIME;
1021 int mdd_attr_set_internal(const struct lu_env *env,
1022 struct mdd_object *obj,
1023 struct lu_attr *attr,
1024 struct thandle *handle,
1030 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1031 #ifdef CONFIG_FS_POSIX_ACL
1032 if (!rc && (attr->la_valid & LA_MODE) && needacl)
1033 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1038 int mdd_attr_check_set_internal(const struct lu_env *env,
1039 struct mdd_object *obj,
1040 struct lu_attr *attr,
1041 struct thandle *handle,
1047 rc = mdd_attr_check(env, obj, attr);
1052 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1056 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1057 struct mdd_object *obj,
1058 struct lu_attr *attr,
1059 struct thandle *handle,
1065 needacl = needacl && (attr->la_valid & LA_MODE);
1067 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1068 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1070 mdd_write_unlock(env, obj);
1074 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1075 struct mdd_object *obj,
1076 struct lu_attr *attr,
1077 struct thandle *handle,
1083 needacl = needacl && (attr->la_valid & LA_MODE);
1085 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1086 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1088 mdd_write_unlock(env, obj);
1092 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1093 const struct lu_buf *buf, const char *name,
1094 int fl, struct thandle *handle)
1096 struct lustre_capa *capa = mdd_object_capa(env, obj);
1100 if (buf->lb_buf && buf->lb_len > 0)
1101 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1102 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1103 rc = mdo_xattr_del(env, obj, name, handle, capa);
1109 * This gives the same functionality as the code between
1110 * sys_chmod and inode_setattr
1111 * chown_common and inode_setattr
1112 * utimes and inode_setattr
1113 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1115 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1116 struct lu_attr *la, const struct md_attr *ma)
1118 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1119 struct md_ucred *uc;
1126 /* Do not permit change file type */
1127 if (la->la_valid & LA_TYPE)
1130 /* They should not be processed by setattr */
1131 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1134 /* export destroy does not have ->le_ses, but we may want
1135 * to drop LUSTRE_SOM_FL. */
1141 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1145 if (la->la_valid == LA_CTIME) {
1146 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1147 /* This is only for set ctime when rename's source is
1149 rc = mdd_may_delete(env, NULL, obj,
1150 (struct md_attr *)ma, 1, 0);
1151 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1152 la->la_valid &= ~LA_CTIME;
1156 if (la->la_valid == LA_ATIME) {
1157 /* This is atime only set for read atime update on close. */
1158 if (la->la_atime >= tmp_la->la_atime &&
1159 la->la_atime < (tmp_la->la_atime +
1160 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1161 la->la_valid &= ~LA_ATIME;
1165 /* Check if flags change. */
1166 if (la->la_valid & LA_FLAGS) {
1167 unsigned int oldflags = 0;
1168 unsigned int newflags = la->la_flags &
1169 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1171 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1172 !mdd_capable(uc, CFS_CAP_FOWNER))
1175 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1176 * only be changed by the relevant capability. */
1177 if (mdd_is_immutable(obj))
1178 oldflags |= LUSTRE_IMMUTABLE_FL;
1179 if (mdd_is_append(obj))
1180 oldflags |= LUSTRE_APPEND_FL;
1181 if ((oldflags ^ newflags) &&
1182 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1185 if (!S_ISDIR(tmp_la->la_mode))
1186 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1189 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1190 (la->la_valid & ~LA_FLAGS) &&
1191 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1194 /* Check for setting the obj time. */
1195 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1196 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1197 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1198 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1199 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1207 if (la->la_valid & LA_KILL_SUID) {
1208 la->la_valid &= ~LA_KILL_SUID;
1209 if ((tmp_la->la_mode & S_ISUID) &&
1210 !(la->la_valid & LA_MODE)) {
1211 la->la_mode = tmp_la->la_mode;
1212 la->la_valid |= LA_MODE;
1214 la->la_mode &= ~S_ISUID;
1217 if (la->la_valid & LA_KILL_SGID) {
1218 la->la_valid &= ~LA_KILL_SGID;
1219 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1220 (S_ISGID | S_IXGRP)) &&
1221 !(la->la_valid & LA_MODE)) {
1222 la->la_mode = tmp_la->la_mode;
1223 la->la_valid |= LA_MODE;
1225 la->la_mode &= ~S_ISGID;
1228 /* Make sure a caller can chmod. */
1229 if (la->la_valid & LA_MODE) {
1230 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1231 (uc->mu_fsuid != tmp_la->la_uid) &&
1232 !mdd_capable(uc, CFS_CAP_FOWNER))
1235 if (la->la_mode == (cfs_umode_t) -1)
1236 la->la_mode = tmp_la->la_mode;
1238 la->la_mode = (la->la_mode & S_IALLUGO) |
1239 (tmp_la->la_mode & ~S_IALLUGO);
1241 /* Also check the setgid bit! */
1242 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1243 la->la_gid : tmp_la->la_gid) &&
1244 !mdd_capable(uc, CFS_CAP_FSETID))
1245 la->la_mode &= ~S_ISGID;
1247 la->la_mode = tmp_la->la_mode;
1250 /* Make sure a caller can chown. */
1251 if (la->la_valid & LA_UID) {
1252 if (la->la_uid == (uid_t) -1)
1253 la->la_uid = tmp_la->la_uid;
1254 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1255 (la->la_uid != tmp_la->la_uid)) &&
1256 !mdd_capable(uc, CFS_CAP_CHOWN))
1259 /* If the user or group of a non-directory has been
1260 * changed by a non-root user, remove the setuid bit.
1261 * 19981026 David C Niemi <niemi@tux.org>
1263 * Changed this to apply to all users, including root,
1264 * to avoid some races. This is the behavior we had in
1265 * 2.0. The check for non-root was definitely wrong
1266 * for 2.2 anyway, as it should have been using
1267 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1268 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1269 !S_ISDIR(tmp_la->la_mode)) {
1270 la->la_mode &= ~S_ISUID;
1271 la->la_valid |= LA_MODE;
1275 /* Make sure caller can chgrp. */
1276 if (la->la_valid & LA_GID) {
1277 if (la->la_gid == (gid_t) -1)
1278 la->la_gid = tmp_la->la_gid;
1279 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1280 ((la->la_gid != tmp_la->la_gid) &&
1281 !lustre_in_group_p(uc, la->la_gid))) &&
1282 !mdd_capable(uc, CFS_CAP_CHOWN))
1285 /* Likewise, if the user or group of a non-directory
1286 * has been changed by a non-root user, remove the
1287 * setgid bit UNLESS there is no group execute bit
1288 * (this would be a file marked for mandatory
1289 * locking). 19981026 David C Niemi <niemi@tux.org>
1291 * Removed the fsuid check (see the comment above) --
1293 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1294 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1295 la->la_mode &= ~S_ISGID;
1296 la->la_valid |= LA_MODE;
1300 /* For both Size-on-MDS case and truncate case,
1301 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1302 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1303 * For SOM case, it is true, the MAY_WRITE perm has been checked
1304 * when open, no need check again. For truncate case, it is false,
1305 * the MAY_WRITE perm should be checked here. */
1306 if (ma->ma_attr_flags & MDS_SOM) {
1307 /* For the "Size-on-MDS" setattr update, merge coming
1308 * attributes with the set in the inode. BUG 10641 */
1309 if ((la->la_valid & LA_ATIME) &&
1310 (la->la_atime <= tmp_la->la_atime))
1311 la->la_valid &= ~LA_ATIME;
1313 /* OST attributes do not have a priority over MDS attributes,
1314 * so drop times if ctime is equal. */
1315 if ((la->la_valid & LA_CTIME) &&
1316 (la->la_ctime <= tmp_la->la_ctime))
1317 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1319 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1320 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1321 (uc->mu_fsuid == tmp_la->la_uid)) &&
1322 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1323 rc = mdd_permission_internal_locked(env, obj,
1330 if (la->la_valid & LA_CTIME) {
1331 /* The pure setattr, it has the priority over what is
1332 * already set, do not drop it if ctime is equal. */
1333 if (la->la_ctime < tmp_la->la_ctime)
1334 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1342 /** Store a data change changelog record
1343 * If this fails, we must fail the whole transaction; we don't
1344 * want the change to commit without the log entry.
1345 * \param mdd_obj - mdd_object of change
1346 * \param handle - transacion handle
1348 static int mdd_changelog_data_store(const struct lu_env *env,
1349 struct mdd_device *mdd,
1350 enum changelog_rec_type type,
1352 struct mdd_object *mdd_obj,
1353 struct thandle *handle)
1355 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1356 struct llog_changelog_rec *rec;
1357 struct thandle *th = NULL;
1363 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1365 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1368 LASSERT(mdd_obj != NULL);
1369 LASSERT(handle != NULL);
1371 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1372 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1373 /* Don't need multiple updates in this log */
1374 /* Don't check under lock - no big deal if we get an extra
1379 reclen = llog_data_len(sizeof(*rec));
1380 buf = mdd_buf_alloc(env, reclen);
1381 if (buf->lb_buf == NULL)
1383 rec = (struct llog_changelog_rec *)buf->lb_buf;
1385 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1386 rec->cr.cr_type = (__u32)type;
1387 rec->cr.cr_tfid = *tfid;
1388 rec->cr.cr_namelen = 0;
1389 mdd_obj->mod_cltime = cfs_time_current_64();
1391 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1394 mdd_trans_stop(env, mdd, rc, th);
1397 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1398 rc, type, PFID(tfid));
1405 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1406 int flags, struct md_object *obj)
1408 struct thandle *handle;
1409 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1410 struct mdd_device *mdd = mdo2mdd(obj);
1414 handle = mdd_trans_create(env, mdd);
1416 return(PTR_ERR(handle));
1418 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1422 rc = mdd_trans_start(env, mdd, handle);
1426 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1430 mdd_trans_stop(env, mdd, rc, handle);
1436 * Should be called with write lock held.
1438 * \see mdd_lma_set_locked().
1440 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1441 const struct md_attr *ma, struct thandle *handle)
1443 struct mdd_thread_info *info = mdd_env_info(env);
1445 struct lustre_mdt_attrs *lma =
1446 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1447 int lmasize = sizeof(struct lustre_mdt_attrs);
1452 /* Either HSM or SOM part is not valid, we need to read it before */
1453 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1454 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1458 lustre_lma_swab(lma);
1460 memset(lma, 0, lmasize);
1464 if (ma->ma_valid & MA_HSM) {
1465 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1466 lma->lma_compat |= LMAC_HSM;
1470 if (ma->ma_valid & MA_SOM) {
1471 LASSERT(ma->ma_som != NULL);
1472 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1473 lma->lma_compat &= ~LMAC_SOM;
1475 lma->lma_compat |= LMAC_SOM;
1476 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1477 lma->lma_som_size = ma->ma_som->msd_size;
1478 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1479 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1484 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1486 lustre_lma_swab(lma);
1487 buf = mdd_buf_get(env, lma, lmasize);
1488 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1494 * Save LMA extended attributes with data from \a ma.
1496 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1497 * not, LMA EA will be first read from disk, modified and write back.
1500 static int mdd_lma_set_locked(const struct lu_env *env,
1501 struct mdd_object *mdd_obj,
1502 const struct md_attr *ma, struct thandle *handle)
1506 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1507 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1508 mdd_write_unlock(env, mdd_obj);
1512 /* Precedence for choosing record type when multiple
1513 * attributes change: setattr > mtime > ctime > atime
1514 * (ctime changes when mtime does, plus chmod/chown.
1515 * atime and ctime are independent.) */
1516 static int mdd_attr_set_changelog(const struct lu_env *env,
1517 struct md_object *obj, struct thandle *handle,
1520 struct mdd_device *mdd = mdo2mdd(obj);
1523 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1524 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1525 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1526 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1527 bits = bits & mdd->mdd_cl.mc_mask;
1531 /* The record type is the lowest non-masked set bit */
1532 while (bits && ((bits & 1) == 0)) {
1537 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1538 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1539 md2mdd_obj(obj), handle);
1542 static int mdd_declare_attr_set(const struct lu_env *env,
1543 struct mdd_device *mdd,
1544 struct mdd_object *obj,
1545 const struct md_attr *ma,
1546 struct lov_mds_md *lmm,
1547 struct thandle *handle)
1549 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1552 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1556 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1560 if (ma->ma_valid & MA_LOV) {
1562 buf->lb_len = ma->ma_lmm_size;
1563 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1569 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1571 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1572 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1578 #ifdef CONFIG_FS_POSIX_ACL
1579 if (ma->ma_attr.la_valid & LA_MODE) {
1580 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1581 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1583 mdd_read_unlock(env, obj);
1584 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1592 rc = mdo_declare_xattr_set(env, obj, buf,
1593 XATTR_NAME_ACL_ACCESS, 0,
1601 /* basically the log is the same as in unlink case */
1605 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1606 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1607 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1608 mdd->mdd_obd_dev->obd_name,
1609 le32_to_cpu(lmm->lmm_magic),
1610 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1614 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1615 if (stripe == LOV_ALL_STRIPES) {
1616 struct lov_desc *ldesc;
1618 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1619 LASSERT(ldesc != NULL);
1620 stripe = ldesc->ld_tgt_count;
1623 for (i = 0; i < stripe; i++) {
1624 rc = mdd_declare_llog_record(env, mdd,
1625 sizeof(struct llog_unlink_rec),
1635 /* set attr and LOV EA at once, return updated attr */
1636 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1637 const struct md_attr *ma)
1639 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1640 struct mdd_device *mdd = mdo2mdd(obj);
1641 struct thandle *handle;
1642 struct lov_mds_md *lmm = NULL;
1643 struct llog_cookie *logcookies = NULL;
1644 int rc, lmm_size = 0, cookie_size = 0;
1645 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1646 struct obd_device *obd = mdd->mdd_obd_dev;
1647 struct mds_obd *mds = &obd->u.mds;
1648 #ifdef HAVE_QUOTA_SUPPORT
1649 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1650 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1651 int quota_opc = 0, block_count = 0;
1652 int inode_pending[MAXQUOTAS] = { 0, 0 };
1653 int block_pending[MAXQUOTAS] = { 0, 0 };
1657 *la_copy = ma->ma_attr;
1658 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1662 /* setattr on "close" only change atime, or do nothing */
1663 if (ma->ma_valid == MA_INODE &&
1664 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1667 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1668 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1669 lmm_size = mdd_lov_mdsize(env, mdd);
1670 lmm = mdd_max_lmm_get(env, mdd);
1674 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1681 handle = mdd_trans_create(env, mdd);
1683 RETURN(PTR_ERR(handle));
1685 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1686 lmm_size > 0 ? lmm : NULL, handle);
1690 /* permission changes may require sync operation */
1691 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1692 handle->th_sync = !!mdd->mdd_sync_permission;
1694 rc = mdd_trans_start(env, mdd, handle);
1698 /* permission changes may require sync operation */
1699 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1700 handle->th_sync |= mdd->mdd_sync_permission;
1702 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1703 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1704 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1706 #ifdef HAVE_QUOTA_SUPPORT
1707 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1708 struct obd_export *exp = md_quota(env)->mq_exp;
1709 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1711 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1713 quota_opc = FSFILT_OP_SETATTR;
1714 mdd_quota_wrapper(la_copy, qnids);
1715 mdd_quota_wrapper(la_tmp, qoids);
1716 /* get file quota for new owner */
1717 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1718 qnids, inode_pending, 1, NULL, 0,
1720 block_count = (la_tmp->la_blocks + 7) >> 3;
1723 mdd_data_get(env, mdd_obj, &data);
1724 /* get block quota for new owner */
1725 lquota_chkquota(mds_quota_interface_ref, obd,
1726 exp, qnids, block_pending,
1728 LQUOTA_FLAGS_BLK, data, 1);
1734 if (la_copy->la_valid & LA_FLAGS) {
1735 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1738 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1739 } else if (la_copy->la_valid) { /* setattr */
1740 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1742 /* journal chown/chgrp in llog, just like unlink */
1743 if (rc == 0 && lmm_size){
1744 cookie_size = mdd_lov_cookiesize(env, mdd);
1745 logcookies = mdd_max_cookie_get(env, mdd);
1746 if (logcookies == NULL)
1747 GOTO(cleanup, rc = -ENOMEM);
1749 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1750 logcookies, cookie_size) <= 0)
1755 if (rc == 0 && ma->ma_valid & MA_LOV) {
1758 mode = mdd_object_type(mdd_obj);
1759 if (S_ISREG(mode) || S_ISDIR(mode)) {
1760 rc = mdd_lsm_sanity_check(env, mdd_obj);
1764 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1765 ma->ma_lmm_size, handle, 1);
1769 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1772 mode = mdd_object_type(mdd_obj);
1774 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1779 rc = mdd_attr_set_changelog(env, obj, handle,
1780 ma->ma_attr.la_valid);
1782 mdd_trans_stop(env, mdd, rc, handle);
1783 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1784 /*set obd attr, if needed*/
1785 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1788 #ifdef HAVE_QUOTA_SUPPORT
1790 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1792 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1794 /* Trigger dqrel/dqacq for original owner and new owner.
1795 * If failed, the next call for lquota_chkquota will
1797 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1804 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1805 const struct lu_buf *buf, const char *name, int fl,
1806 struct thandle *handle)
1811 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1812 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1813 mdd_write_unlock(env, obj);
1818 static int mdd_xattr_sanity_check(const struct lu_env *env,
1819 struct mdd_object *obj)
1821 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1822 struct md_ucred *uc = md_ucred(env);
1826 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1829 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1833 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1834 !mdd_capable(uc, CFS_CAP_FOWNER))
1840 static int mdd_declare_xattr_set(const struct lu_env *env,
1841 struct mdd_device *mdd,
1842 struct mdd_object *obj,
1843 const struct lu_buf *buf,
1845 struct thandle *handle)
1850 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1854 /* Only record user xattr changes */
1855 if ((strncmp("user.", name, 5) == 0))
1856 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1862 * The caller should guarantee to update the object ctime
1863 * after xattr_set if needed.
1865 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1866 const struct lu_buf *buf, const char *name,
1869 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1870 struct mdd_device *mdd = mdo2mdd(obj);
1871 struct thandle *handle;
1875 rc = mdd_xattr_sanity_check(env, mdd_obj);
1879 handle = mdd_trans_create(env, mdd);
1881 RETURN(PTR_ERR(handle));
1883 /* security-replated changes may require sync */
1884 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1885 mdd->mdd_sync_permission == 1)
1886 handle->th_sync = 1;
1888 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1892 rc = mdd_trans_start(env, mdd, handle);
1896 /* security-replated changes may require sync */
1897 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1898 handle->th_sync |= mdd->mdd_sync_permission;
1900 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1902 /* Only record system & user xattr changes */
1903 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1904 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1905 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1906 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1907 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1908 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1909 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1913 mdd_trans_stop(env, mdd, rc, handle);
1918 static int mdd_declare_xattr_del(const struct lu_env *env,
1919 struct mdd_device *mdd,
1920 struct mdd_object *obj,
1922 struct thandle *handle)
1926 rc = mdo_declare_xattr_del(env, obj, name, handle);
1930 /* Only record user xattr changes */
1931 if ((strncmp("user.", name, 5) == 0))
1932 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1938 * The caller should guarantee to update the object ctime
1939 * after xattr_set if needed.
1941 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1944 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1945 struct mdd_device *mdd = mdo2mdd(obj);
1946 struct thandle *handle;
1950 rc = mdd_xattr_sanity_check(env, mdd_obj);
1954 handle = mdd_trans_create(env, mdd);
1956 RETURN(PTR_ERR(handle));
1958 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1962 rc = mdd_trans_start(env, mdd, handle);
1966 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1967 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1968 mdd_object_capa(env, mdd_obj));
1969 mdd_write_unlock(env, mdd_obj);
1971 /* Only record system & user xattr changes */
1972 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1973 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1974 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1975 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1976 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1977 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1978 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1982 mdd_trans_stop(env, mdd, rc, handle);
1987 /* partial unlink */
1988 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1991 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1992 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1993 struct mdd_device *mdd = mdo2mdd(obj);
1994 struct thandle *handle;
1995 #ifdef HAVE_QUOTA_SUPPORT
1996 struct obd_device *obd = mdd->mdd_obd_dev;
1997 struct mds_obd *mds = &obd->u.mds;
1998 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2004 /* XXX: this code won't be used ever:
2005 * DNE uses slightly different approach */
2009 * Check -ENOENT early here because we need to get object type
2010 * to calculate credits before transaction start
2012 if (mdd_object_exists(mdd_obj) == 0) {
2013 CERROR("%s: object "DFID" not found: rc = -2\n",
2014 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2018 LASSERT(mdd_object_exists(mdd_obj) > 0);
2020 handle = mdd_trans_create(env, mdd);
2024 rc = mdd_trans_start(env, mdd, handle);
2026 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2028 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2032 mdo_ref_del(env, mdd_obj, handle);
2034 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2036 mdo_ref_del(env, mdd_obj, handle);
2039 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2040 la_copy->la_ctime = ma->ma_attr.la_ctime;
2042 la_copy->la_valid = LA_CTIME;
2043 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2047 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2048 #ifdef HAVE_QUOTA_SUPPORT
2049 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2050 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2051 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2052 mdd_quota_wrapper(&ma->ma_attr, qids);
2059 mdd_write_unlock(env, mdd_obj);
2060 mdd_trans_stop(env, mdd, rc, handle);
2061 #ifdef HAVE_QUOTA_SUPPORT
2063 /* Trigger dqrel on the owner of child. If failed,
2064 * the next call for lquota_chkquota will process it */
2065 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2071 /* partial operation */
2072 static int mdd_oc_sanity_check(const struct lu_env *env,
2073 struct mdd_object *obj,
2079 switch (ma->ma_attr.la_mode & S_IFMT) {
2096 static int mdd_object_create(const struct lu_env *env,
2097 struct md_object *obj,
2098 const struct md_op_spec *spec,
2102 struct mdd_device *mdd = mdo2mdd(obj);
2103 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2104 const struct lu_fid *pfid = spec->u.sp_pfid;
2105 struct thandle *handle;
2106 #ifdef HAVE_QUOTA_SUPPORT
2107 struct obd_device *obd = mdd->mdd_obd_dev;
2108 struct obd_export *exp = md_quota(env)->mq_exp;
2109 struct mds_obd *mds = &obd->u.mds;
2110 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2111 int quota_opc = 0, block_count = 0;
2112 int inode_pending[MAXQUOTAS] = { 0, 0 };
2113 int block_pending[MAXQUOTAS] = { 0, 0 };
2118 /* XXX: this code won't be used ever:
2119 * DNE uses slightly different approach */
2122 #ifdef HAVE_QUOTA_SUPPORT
2123 if (mds->mds_quota) {
2124 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2125 mdd_quota_wrapper(&ma->ma_attr, qids);
2126 /* get file quota for child */
2127 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2128 qids, inode_pending, 1, NULL, 0,
2130 switch (ma->ma_attr.la_mode & S_IFMT) {
2139 /* get block quota for child */
2141 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2142 qids, block_pending, block_count,
2143 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2147 handle = mdd_trans_create(env, mdd);
2149 GOTO(out_pending, rc = PTR_ERR(handle));
2151 rc = mdd_trans_start(env, mdd, handle);
2153 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2154 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2158 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2162 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2163 /* If creating the slave object, set slave EA here. */
2164 int lmv_size = spec->u.sp_ea.eadatalen;
2165 struct lmv_stripe_md *lmv;
2167 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2168 LASSERT(lmv != NULL && lmv_size > 0);
2170 rc = __mdd_xattr_set(env, mdd_obj,
2171 mdd_buf_get_const(env, lmv, lmv_size),
2172 XATTR_NAME_LMV, 0, handle);
2176 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2179 #ifdef CONFIG_FS_POSIX_ACL
2180 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2181 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2183 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2184 buf->lb_len = spec->u.sp_ea.eadatalen;
2185 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2186 rc = __mdd_acl_init(env, mdd_obj, buf,
2187 &ma->ma_attr.la_mode,
2192 ma->ma_attr.la_valid |= LA_MODE;
2195 pfid = spec->u.sp_ea.fid;
2198 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2204 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2205 mdd_write_unlock(env, mdd_obj);
2207 mdd_trans_stop(env, mdd, rc, handle);
2209 #ifdef HAVE_QUOTA_SUPPORT
2211 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2213 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2215 /* Trigger dqacq on the owner of child. If failed,
2216 * the next call for lquota_chkquota will process it. */
2217 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2225 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2226 const struct md_attr *ma)
2228 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2229 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2230 struct mdd_device *mdd = mdo2mdd(obj);
2231 struct thandle *handle;
2235 /* XXX: this code won't be used ever:
2236 * DNE uses slightly different approach */
2239 handle = mdd_trans_create(env, mdd);
2243 rc = mdd_trans_start(env, mdd, handle);
2245 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2246 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2248 mdo_ref_add(env, mdd_obj, handle);
2249 mdd_write_unlock(env, mdd_obj);
2251 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2252 la_copy->la_ctime = ma->ma_attr.la_ctime;
2254 la_copy->la_valid = LA_CTIME;
2255 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2258 mdd_trans_stop(env, mdd, 0, handle);
2264 * do NOT or the MAY_*'s, you'll get the weakest
2266 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2270 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2271 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2272 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2273 * owner can write to a file even if it is marked readonly to hide
2274 * its brokenness. (bug 5781) */
2275 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2276 struct md_ucred *uc = md_ucred(env);
2278 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2279 (la->la_uid == uc->mu_fsuid))
2283 if (flags & FMODE_READ)
2285 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2287 if (flags & MDS_FMODE_EXEC)
2292 static int mdd_open_sanity_check(const struct lu_env *env,
2293 struct mdd_object *obj, int flag)
2295 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2300 if (mdd_is_dead_obj(obj))
2303 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2307 if (S_ISLNK(tmp_la->la_mode))
2310 mode = accmode(env, tmp_la, flag);
2312 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2315 if (!(flag & MDS_OPEN_CREATED)) {
2316 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2321 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2322 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2323 flag &= ~MDS_OPEN_TRUNC;
2325 /* For writing append-only file must open it with append mode. */
2326 if (mdd_is_append(obj)) {
2327 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2329 if (flag & MDS_OPEN_TRUNC)
2335 * Now, flag -- O_NOATIME does not be packed by client.
2337 if (flag & O_NOATIME) {
2338 struct md_ucred *uc = md_ucred(env);
2340 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2341 (uc->mu_valid == UCRED_NEW)) &&
2342 (uc->mu_fsuid != tmp_la->la_uid) &&
2343 !mdd_capable(uc, CFS_CAP_FOWNER))
2351 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2354 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2357 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2359 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2361 mdd_obj->mod_count++;
2363 mdd_write_unlock(env, mdd_obj);
2367 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2368 struct md_attr *ma, struct thandle *handle)
2372 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2376 return mdo_declare_destroy(env, obj, handle);
2379 /* return md_attr back,
2380 * if it is last unlink then return lov ea + llog cookie*/
2381 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2382 struct md_attr *ma, struct thandle *handle)
2387 if (S_ISREG(mdd_object_type(obj))) {
2388 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2389 * Caller must be ready for that. */
2390 rc = __mdd_lmm_get(env, obj, ma);
2391 if ((ma->ma_valid & MA_LOV))
2392 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2397 rc = mdo_destroy(env, obj, handle);
2402 static int mdd_declare_close(const struct lu_env *env,
2403 struct mdd_object *obj,
2405 struct thandle *handle)
2409 rc = orph_declare_index_delete(env, obj, handle);
2413 return mdd_declare_object_kill(env, obj, ma, handle);
2417 * No permission check is needed.
2419 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2420 struct md_attr *ma, int mode)
2422 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2423 struct mdd_device *mdd = mdo2mdd(obj);
2424 struct thandle *handle = NULL;
2426 int is_orphan = 0, reset = 1;
2428 #ifdef HAVE_QUOTA_SUPPORT
2429 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2430 struct mds_obd *mds = &obd->u.mds;
2431 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2436 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2437 mdd_obj->mod_count--;
2439 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2440 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2441 "list\n", PFID(mdd_object_fid(mdd_obj)));
2445 /* check without any lock */
2446 if (mdd_obj->mod_count == 1 &&
2447 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2449 handle = mdd_trans_create(env, mdo2mdd(obj));
2451 RETURN(PTR_ERR(handle));
2453 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2457 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2461 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2466 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2467 if (handle == NULL && mdd_obj->mod_count == 1 &&
2468 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2469 mdd_write_unlock(env, mdd_obj);
2473 /* release open count */
2474 mdd_obj->mod_count --;
2476 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2477 /* remove link to object from orphan index */
2478 LASSERT(handle != NULL);
2479 rc = __mdd_orphan_del(env, mdd_obj, handle);
2481 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2482 "list, OSS objects to be destroyed.\n",
2483 PFID(mdd_object_fid(mdd_obj)));
2486 CERROR("Object "DFID" can not be deleted from orphan "
2487 "list, maybe cause OST objects can not be "
2488 "destroyed (err: %d).\n",
2489 PFID(mdd_object_fid(mdd_obj)), rc);
2490 /* If object was not deleted from orphan list, do not
2491 * destroy OSS objects, which will be done when next
2497 rc = mdd_iattr_get(env, mdd_obj, ma);
2498 /* Object maybe not in orphan list originally, it is rare case for
2499 * mdd_finish_unlink() failure. */
2500 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2501 #ifdef HAVE_QUOTA_SUPPORT
2502 if (mds->mds_quota) {
2503 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2504 mdd_quota_wrapper(&ma->ma_attr, qids);
2507 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2508 if (ma->ma_valid & MA_FLAGS &&
2509 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2510 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2512 if (handle == NULL) {
2513 handle = mdd_trans_create(env, mdo2mdd(obj));
2515 GOTO(out, rc = PTR_ERR(handle));
2517 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2522 rc = mdd_declare_changelog_store(env, mdd,
2527 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2532 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2538 CERROR("Error when prepare to delete Object "DFID" , "
2539 "which will cause OST objects can not be "
2540 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2546 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2548 mdd_write_unlock(env, mdd_obj);
2551 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2552 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2553 if (handle == NULL) {
2554 handle = mdd_trans_create(env, mdo2mdd(obj));
2556 GOTO(stop, rc = IS_ERR(handle));
2558 rc = mdd_declare_changelog_store(env, mdd, NULL,
2563 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2568 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2574 mdd_trans_stop(env, mdd, rc, handle);
2575 #ifdef HAVE_QUOTA_SUPPORT
2577 /* Trigger dqrel on the owner of child. If failed,
2578 * the next call for lquota_chkquota will process it */
2579 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2586 * Permission check is done when open,
2587 * no need check again.
2589 static int mdd_readpage_sanity_check(const struct lu_env *env,
2590 struct mdd_object *obj)
2592 struct dt_object *next = mdd_object_child(obj);
2596 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2604 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2605 struct lu_dirpage *dp, int nob,
2606 const struct dt_it_ops *iops, struct dt_it *it,
2612 struct lu_dirent *ent;
2613 struct lu_dirent *last = NULL;
2616 memset(area, 0, sizeof (*dp));
2617 area += sizeof (*dp);
2618 nob -= sizeof (*dp);
2625 len = iops->key_size(env, it);
2627 /* IAM iterator can return record with zero len. */
2631 hash = iops->store(env, it);
2632 if (unlikely(first)) {
2634 dp->ldp_hash_start = cpu_to_le64(hash);
2637 /* calculate max space required for lu_dirent */
2638 recsize = lu_dirent_calc_size(len, attr);
2640 if (nob >= recsize) {
2641 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2642 if (result == -ESTALE)
2647 /* osd might not able to pack all attributes,
2648 * so recheck rec length */
2649 recsize = le16_to_cpu(ent->lde_reclen);
2651 result = (last != NULL) ? 0 :-EINVAL;
2655 ent = (void *)ent + recsize;
2659 result = iops->next(env, it);
2660 if (result == -ESTALE)
2662 } while (result == 0);
2665 dp->ldp_hash_end = cpu_to_le64(hash);
2667 if (last->lde_hash == dp->ldp_hash_end)
2668 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2669 last->lde_reclen = 0; /* end mark */
2674 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2675 const struct lu_rdpg *rdpg)
2678 struct dt_object *next = mdd_object_child(obj);
2679 const struct dt_it_ops *iops;
2681 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2687 LASSERT(rdpg->rp_pages != NULL);
2688 LASSERT(next->do_index_ops != NULL);
2690 if (rdpg->rp_count <= 0)
2694 * iterate through directory and fill pages from @rdpg
2696 iops = &next->do_index_ops->dio_it;
2697 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2701 rc = iops->load(env, it, rdpg->rp_hash);
2705 * Iterator didn't find record with exactly the key requested.
2707 * It is currently either
2709 * - positioned above record with key less than
2710 * requested---skip it.
2712 * - or not positioned at all (is in IAM_IT_SKEWED
2713 * state)---position it on the next item.
2715 rc = iops->next(env, it);
2720 * At this point and across for-loop:
2722 * rc == 0 -> ok, proceed.
2723 * rc > 0 -> end of directory.
2726 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2727 i++, nob -= CFS_PAGE_SIZE) {
2728 struct lu_dirpage *dp;
2730 LASSERT(i < rdpg->rp_npages);
2731 pg = rdpg->rp_pages[i];
2733 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2736 rc = mdd_dir_page_build(env, mdd, dp,
2737 min_t(int, nob, LU_PAGE_SIZE),
2738 iops, it, rdpg->rp_attrs);
2743 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2745 } else if (rc < 0) {
2746 CWARN("build page failed: %d!\n", rc);
2749 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2750 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2751 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2758 struct lu_dirpage *dp;
2760 dp = cfs_kmap(rdpg->rp_pages[0]);
2761 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2764 * No pages were processed, mark this for first page
2767 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2770 cfs_kunmap(rdpg->rp_pages[0]);
2772 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2775 iops->fini(env, it);
2780 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2781 const struct lu_rdpg *rdpg)
2783 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2787 if (mdd_object_exists(mdd_obj) == 0) {
2788 CERROR("%s: object "DFID" not found: rc = -2\n",
2789 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2793 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2794 rc = mdd_readpage_sanity_check(env, mdd_obj);
2796 GOTO(out_unlock, rc);
2798 if (mdd_is_dead_obj(mdd_obj)) {
2800 struct lu_dirpage *dp;
2803 * According to POSIX, please do not return any entry to client:
2804 * even dot and dotdot should not be returned.
2806 CWARN("readdir from dead object: "DFID"\n",
2807 PFID(mdd_object_fid(mdd_obj)));
2809 if (rdpg->rp_count <= 0)
2810 GOTO(out_unlock, rc = -EFAULT);
2811 LASSERT(rdpg->rp_pages != NULL);
2813 pg = rdpg->rp_pages[0];
2814 dp = (struct lu_dirpage*)cfs_kmap(pg);
2815 memset(dp, 0 , sizeof(struct lu_dirpage));
2816 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2817 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2818 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2820 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2823 rc = __mdd_readpage(env, mdd_obj, rdpg);
2827 mdd_read_unlock(env, mdd_obj);
2831 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2833 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2834 struct dt_object *next;
2836 if (mdd_object_exists(mdd_obj) == 0) {
2837 CERROR("%s: object "DFID" not found: rc = -2\n",
2838 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2841 next = mdd_object_child(mdd_obj);
2842 return next->do_ops->do_object_sync(env, next);
2845 const struct md_object_operations mdd_obj_ops = {
2846 .moo_permission = mdd_permission,
2847 .moo_attr_get = mdd_attr_get,
2848 .moo_attr_set = mdd_attr_set,
2849 .moo_xattr_get = mdd_xattr_get,
2850 .moo_xattr_set = mdd_xattr_set,
2851 .moo_xattr_list = mdd_xattr_list,
2852 .moo_xattr_del = mdd_xattr_del,
2853 .moo_object_create = mdd_object_create,
2854 .moo_ref_add = mdd_ref_add,
2855 .moo_ref_del = mdd_ref_del,
2856 .moo_open = mdd_open,
2857 .moo_close = mdd_close,
2858 .moo_readpage = mdd_readpage,
2859 .moo_readlink = mdd_readlink,
2860 .moo_changelog = mdd_changelog,
2861 .moo_capa_get = mdd_capa_get,
2862 .moo_object_sync = mdd_object_sync,
2863 .moo_path = mdd_path,
2864 .moo_file_lock = mdd_file_lock,
2865 .moo_file_unlock = mdd_file_unlock,