1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
64 #include "mdd_internal.h"
66 static const struct lu_object_operations mdd_lu_obj_ops;
68 static int mdd_xattr_get(const struct lu_env *env,
69 struct md_object *obj, struct lu_buf *buf,
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
75 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
76 PFID(mdd_object_fid(obj)));
77 mdo_data_get(env, obj, data);
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82 struct lu_attr *la, struct lustre_capa *capa)
84 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85 PFID(mdd_object_fid(obj)));
86 return mdo_attr_get(env, obj, la, capa);
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93 if (flags & LUSTRE_APPEND_FL)
94 obj->mod_flags |= APPEND_OBJ;
96 if (flags & LUSTRE_IMMUTABLE_FL)
97 obj->mod_flags |= IMMUTE_OBJ;
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 struct mdd_thread_info *info;
104 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105 LASSERT(info != NULL);
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
113 buf = &mdd_env_info(env)->mti_buf;
119 void mdd_buf_put(struct lu_buf *buf)
121 if (buf == NULL || buf->lb_buf == NULL)
123 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129 const void *area, ssize_t len)
133 buf = &mdd_env_info(env)->mti_buf;
134 buf->lb_buf = (void *)area;
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
147 if (buf->lb_buf == NULL) {
149 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150 if (buf->lb_buf == NULL)
156 /** Increase the size of the \a mti_big_buf.
157 * preserves old data in buffer
158 * old buffer remains unchanged on error
159 * \retval 0 or -ENOMEM
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
166 LASSERT(len >= oldbuf->lb_len);
167 OBD_ALLOC_LARGE(buf.lb_buf, len);
169 if (buf.lb_buf == NULL)
173 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177 memcpy(oldbuf, &buf, sizeof(buf));
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183 struct mdd_device *mdd)
185 struct mdd_thread_info *mti = mdd_env_info(env);
188 max_cookie_size = mdd_lov_cookiesize(env, mdd);
189 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190 if (mti->mti_max_cookie)
191 OBD_FREE_LARGE(mti->mti_max_cookie,
192 mti->mti_max_cookie_size);
193 mti->mti_max_cookie = NULL;
194 mti->mti_max_cookie_size = 0;
196 if (unlikely(mti->mti_max_cookie == NULL)) {
197 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198 if (likely(mti->mti_max_cookie != NULL))
199 mti->mti_max_cookie_size = max_cookie_size;
201 if (likely(mti->mti_max_cookie != NULL))
202 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203 return mti->mti_max_cookie;
206 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
207 struct mdd_device *mdd)
209 struct mdd_thread_info *mti = mdd_env_info(env);
212 max_lmm_size = mdd_lov_mdsize(env, mdd);
213 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
214 if (mti->mti_max_lmm)
215 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
216 mti->mti_max_lmm = NULL;
217 mti->mti_max_lmm_size = 0;
219 if (unlikely(mti->mti_max_lmm == NULL)) {
220 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
221 if (likely(mti->mti_max_lmm != NULL))
222 mti->mti_max_lmm_size = max_lmm_size;
224 return mti->mti_max_lmm;
227 struct lu_object *mdd_object_alloc(const struct lu_env *env,
228 const struct lu_object_header *hdr,
231 struct mdd_object *mdd_obj;
233 OBD_ALLOC_PTR(mdd_obj);
234 if (mdd_obj != NULL) {
237 o = mdd2lu_obj(mdd_obj);
238 lu_object_init(o, NULL, d);
239 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
240 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
241 mdd_obj->mod_count = 0;
242 o->lo_ops = &mdd_lu_obj_ops;
249 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
250 const struct lu_object_conf *unused)
252 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
253 struct mdd_object *mdd_obj = lu2mdd_obj(o);
254 struct lu_object *below;
255 struct lu_device *under;
258 mdd_obj->mod_cltime = 0;
259 under = &d->mdd_child->dd_lu_dev;
260 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
261 mdd_pdlock_init(mdd_obj);
265 lu_object_add(o, below);
270 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
272 if (lu_object_exists(o))
273 return mdd_get_flags(env, lu2mdd_obj(o));
278 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
280 struct mdd_object *mdd = lu2mdd_obj(o);
286 static int mdd_object_print(const struct lu_env *env, void *cookie,
287 lu_printer_t p, const struct lu_object *o)
289 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
290 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
291 "valid=%x, cltime="LPU64", flags=%lx)",
292 mdd, mdd->mod_count, mdd->mod_valid,
293 mdd->mod_cltime, mdd->mod_flags);
296 static const struct lu_object_operations mdd_lu_obj_ops = {
297 .loo_object_init = mdd_object_init,
298 .loo_object_start = mdd_object_start,
299 .loo_object_free = mdd_object_free,
300 .loo_object_print = mdd_object_print,
303 struct mdd_object *mdd_object_find(const struct lu_env *env,
304 struct mdd_device *d,
305 const struct lu_fid *f)
307 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
310 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
311 const char *path, struct lu_fid *fid)
314 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
315 struct mdd_object *obj;
316 struct lu_name *lname = &mdd_env_info(env)->mti_name;
321 /* temp buffer for path element */
322 buf = mdd_buf_alloc(env, PATH_MAX);
323 if (buf->lb_buf == NULL)
326 lname->ln_name = name = buf->lb_buf;
327 lname->ln_namelen = 0;
328 *f = mdd->mdd_root_fid;
335 while (*path != '/' && *path != '\0') {
343 /* find obj corresponding to fid */
344 obj = mdd_object_find(env, mdd, f);
346 GOTO(out, rc = -EREMOTE);
348 GOTO(out, rc = PTR_ERR(obj));
349 /* get child fid from parent and name */
350 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
351 mdd_object_put(env, obj);
356 lname->ln_namelen = 0;
365 /** The maximum depth that fid2path() will search.
366 * This is limited only because we want to store the fids for
367 * historical path lookup purposes.
369 #define MAX_PATH_DEPTH 100
371 /** mdd_path() lookup structure. */
372 struct path_lookup_info {
373 __u64 pli_recno; /**< history point */
374 __u64 pli_currec; /**< current record */
375 struct lu_fid pli_fid;
376 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377 struct mdd_object *pli_mdd_obj;
378 char *pli_path; /**< full path */
380 int pli_linkno; /**< which hardlink to follow */
381 int pli_fidcount; /**< number of \a pli_fids */
384 static int mdd_path_current(const struct lu_env *env,
385 struct path_lookup_info *pli)
387 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388 struct mdd_object *mdd_obj;
389 struct lu_buf *buf = NULL;
390 struct link_ea_header *leh;
391 struct link_ea_entry *lee;
392 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
399 ptr = pli->pli_path + pli->pli_pathlen - 1;
402 pli->pli_fidcount = 0;
403 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
405 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406 mdd_obj = mdd_object_find(env, mdd,
407 &pli->pli_fids[pli->pli_fidcount]);
409 GOTO(out, rc = -EREMOTE);
411 GOTO(out, rc = PTR_ERR(mdd_obj));
412 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
414 mdd_object_put(env, mdd_obj);
418 /* Do I need to error out here? */
423 /* Get parent fid and object name */
424 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425 buf = mdd_links_get(env, mdd_obj);
426 mdd_read_unlock(env, mdd_obj);
427 mdd_object_put(env, mdd_obj);
429 GOTO(out, rc = PTR_ERR(buf));
432 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
433 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
435 /* If set, use link #linkno for path lookup, otherwise use
436 link #0. Only do this for the final path element. */
437 if ((pli->pli_fidcount == 0) &&
438 (pli->pli_linkno < leh->leh_reccount)) {
440 for (count = 0; count < pli->pli_linkno; count++) {
441 lee = (struct link_ea_entry *)
442 ((char *)lee + reclen);
443 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445 if (pli->pli_linkno < leh->leh_reccount - 1)
446 /* indicate to user there are more links */
450 /* Pack the name in the end of the buffer */
451 ptr -= tmpname->ln_namelen;
452 if (ptr - 1 <= pli->pli_path)
453 GOTO(out, rc = -EOVERFLOW);
454 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
457 /* Store the parent fid for historic lookup */
458 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
459 GOTO(out, rc = -EOVERFLOW);
460 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
463 /* Verify that our path hasn't changed since we started the lookup.
464 Record the current index, and verify the path resolves to the
465 same fid. If it does, then the path is correct as of this index. */
466 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
467 pli->pli_currec = mdd->mdd_cl.mc_index;
468 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
469 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
471 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
472 GOTO (out, rc = -EAGAIN);
474 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
475 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
476 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
477 PFID(&pli->pli_fid));
478 GOTO(out, rc = -EAGAIN);
480 ptr++; /* skip leading / */
481 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
485 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
486 /* if we vmalloced a large buffer drop it */
492 static int mdd_path_historic(const struct lu_env *env,
493 struct path_lookup_info *pli)
498 /* Returns the full path to this fid, as of changelog record recno. */
499 static int mdd_path(const struct lu_env *env, struct md_object *obj,
500 char *path, int pathlen, __u64 *recno, int *linkno)
502 struct path_lookup_info *pli;
510 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
519 pli->pli_mdd_obj = md2mdd_obj(obj);
520 pli->pli_recno = *recno;
521 pli->pli_path = path;
522 pli->pli_pathlen = pathlen;
523 pli->pli_linkno = *linkno;
525 /* Retry multiple times in case file is being moved */
526 while (tries-- && rc == -EAGAIN)
527 rc = mdd_path_current(env, pli);
529 /* For historical path lookup, the current links may not have existed
530 * at "recno" time. We must switch over to earlier links/parents
531 * by using the changelog records. If the earlier parent doesn't
532 * exist, we must search back through the changelog to reconstruct
533 * its parents, then check if it exists, etc.
534 * We may ignore this problem for the initial implementation and
535 * state that an "original" hardlink must still exist for us to find
536 * historic path name. */
537 if (pli->pli_recno != -1) {
538 rc = mdd_path_historic(env, pli);
540 *recno = pli->pli_currec;
541 /* Return next link index to caller */
542 *linkno = pli->pli_linkno;
550 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
552 struct lu_attr *la = &mdd_env_info(env)->mti_la;
556 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
558 mdd_flags_xlate(obj, la->la_flags);
563 /* get only inode attributes */
564 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
570 if (ma->ma_valid & MA_INODE)
573 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
574 mdd_object_capa(env, mdd_obj));
576 ma->ma_valid |= MA_INODE;
580 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
582 struct lov_desc *ldesc;
583 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
584 struct lov_user_md *lum = (struct lov_user_md*)lmm;
590 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
591 LASSERT(ldesc != NULL);
593 lum->lmm_magic = LOV_MAGIC_V1;
594 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
595 lum->lmm_pattern = ldesc->ld_pattern;
596 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
597 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
598 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
600 RETURN(sizeof(*lum));
603 static int is_rootdir(struct mdd_object *mdd_obj)
605 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
606 const struct lu_fid *fid = mdo2fid(mdd_obj);
608 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
611 /* get lov EA only */
612 static int __mdd_lmm_get(const struct lu_env *env,
613 struct mdd_object *mdd_obj, struct md_attr *ma)
618 if (ma->ma_valid & MA_LOV)
621 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
623 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
624 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
626 ma->ma_lmm_size = rc;
627 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
628 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
634 /* get the first parent fid from link EA */
635 static int mdd_pfid_get(const struct lu_env *env,
636 struct mdd_object *mdd_obj, struct md_attr *ma)
639 struct link_ea_header *leh;
640 struct link_ea_entry *lee;
641 struct lu_fid *pfid = &ma->ma_pfid;
644 if (ma->ma_valid & MA_PFID)
647 buf = mdd_links_get(env, mdd_obj);
649 RETURN(PTR_ERR(buf));
652 lee = (struct link_ea_entry *)(leh + 1);
653 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
654 fid_be_to_cpu(pfid, pfid);
655 ma->ma_valid |= MA_PFID;
656 if (buf->lb_len > OBD_ALLOC_BIG)
657 /* if we vmalloced a large buffer drop it */
662 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
668 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
669 rc = __mdd_lmm_get(env, mdd_obj, ma);
670 mdd_read_unlock(env, mdd_obj);
675 static int __mdd_lmv_get(const struct lu_env *env,
676 struct mdd_object *mdd_obj, struct md_attr *ma)
681 if (ma->ma_valid & MA_LMV)
684 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
687 ma->ma_valid |= MA_LMV;
693 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
696 struct mdd_thread_info *info = mdd_env_info(env);
697 struct lustre_mdt_attrs *lma =
698 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
703 /* If all needed data are already valid, nothing to do */
704 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
705 (ma->ma_need & (MA_HSM | MA_SOM)))
708 /* Read LMA from disk EA */
709 lma_size = sizeof(info->mti_xattr_buf);
710 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
714 /* Useless to check LMA incompatibility because this is already done in
715 * osd_ea_fid_get(), and this will fail long before this code is
717 * So, if we are here, LMA is compatible.
720 lustre_lma_swab(lma);
722 /* Swab and copy LMA */
723 if (ma->ma_need & MA_HSM) {
724 if (lma->lma_compat & LMAC_HSM)
725 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
727 ma->ma_hsm.mh_flags = 0;
728 ma->ma_valid |= MA_HSM;
732 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
733 LASSERT(ma->ma_som != NULL);
734 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
735 ma->ma_som->msd_size = lma->lma_som_size;
736 ma->ma_som->msd_blocks = lma->lma_som_blocks;
737 ma->ma_som->msd_mountid = lma->lma_som_mountid;
738 ma->ma_valid |= MA_SOM;
744 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
750 if (ma->ma_need & MA_INODE)
751 rc = mdd_iattr_get(env, mdd_obj, ma);
753 if (rc == 0 && ma->ma_need & MA_LOV) {
754 if (S_ISREG(mdd_object_type(mdd_obj)) ||
755 S_ISDIR(mdd_object_type(mdd_obj)))
756 rc = __mdd_lmm_get(env, mdd_obj, ma);
758 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
759 if (S_ISREG(mdd_object_type(mdd_obj)))
760 rc = mdd_pfid_get(env, mdd_obj, ma);
762 if (rc == 0 && ma->ma_need & MA_LMV) {
763 if (S_ISDIR(mdd_object_type(mdd_obj)))
764 rc = __mdd_lmv_get(env, mdd_obj, ma);
766 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
767 if (S_ISREG(mdd_object_type(mdd_obj)))
768 rc = __mdd_lma_get(env, mdd_obj, ma);
770 #ifdef CONFIG_FS_POSIX_ACL
771 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
772 if (S_ISDIR(mdd_object_type(mdd_obj)))
773 rc = mdd_def_acl_get(env, mdd_obj, ma);
776 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
777 rc, ma->ma_valid, ma->ma_lmm);
781 int mdd_attr_get_internal_locked(const struct lu_env *env,
782 struct mdd_object *mdd_obj, struct md_attr *ma)
785 int needlock = ma->ma_need &
786 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
789 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
790 rc = mdd_attr_get_internal(env, mdd_obj, ma);
792 mdd_read_unlock(env, mdd_obj);
797 * No permission check is needed.
799 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
802 struct mdd_object *mdd_obj = md2mdd_obj(obj);
806 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
811 * No permission check is needed.
813 static int mdd_xattr_get(const struct lu_env *env,
814 struct md_object *obj, struct lu_buf *buf,
817 struct mdd_object *mdd_obj = md2mdd_obj(obj);
822 LASSERT(mdd_object_exists(mdd_obj));
824 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
825 rc = mdo_xattr_get(env, mdd_obj, buf, name,
826 mdd_object_capa(env, mdd_obj));
827 mdd_read_unlock(env, mdd_obj);
833 * Permission check is done when open,
834 * no need check again.
836 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
839 struct mdd_object *mdd_obj = md2mdd_obj(obj);
840 struct dt_object *next;
845 LASSERT(mdd_object_exists(mdd_obj));
847 next = mdd_object_child(mdd_obj);
848 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
849 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
850 mdd_object_capa(env, mdd_obj));
851 mdd_read_unlock(env, mdd_obj);
856 * No permission check is needed.
858 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
861 struct mdd_object *mdd_obj = md2mdd_obj(obj);
866 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
867 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
868 mdd_read_unlock(env, mdd_obj);
873 int mdd_declare_object_create_internal(const struct lu_env *env,
874 struct mdd_object *p,
875 struct mdd_object *c,
877 struct thandle *handle,
878 const struct md_op_spec *spec)
880 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
881 const struct dt_index_features *feat = spec->sp_feat;
885 if (feat != &dt_directory_features && feat != NULL)
886 dof->dof_type = DFT_INDEX;
888 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
890 dof->u.dof_idx.di_feat = feat;
892 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
897 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
898 struct mdd_object *c, struct md_attr *ma,
899 struct thandle *handle,
900 const struct md_op_spec *spec)
902 struct lu_attr *attr = &ma->ma_attr;
903 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
904 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
905 const struct dt_index_features *feat = spec->sp_feat;
909 if (!mdd_object_exists(c)) {
910 struct dt_object *next = mdd_object_child(c);
913 if (feat != &dt_directory_features && feat != NULL)
914 dof->dof_type = DFT_INDEX;
916 dof->dof_type = dt_mode_to_dft(attr->la_mode);
918 dof->u.dof_idx.di_feat = feat;
920 /* @hint will be initialized by underlying device. */
921 next->do_ops->do_ah_init(env, hint,
922 p ? mdd_object_child(p) : NULL,
923 attr->la_mode & S_IFMT);
925 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
926 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
934 * Make sure the ctime is increased only.
936 static inline int mdd_attr_check(const struct lu_env *env,
937 struct mdd_object *obj,
938 struct lu_attr *attr)
940 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
944 if (attr->la_valid & LA_CTIME) {
945 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
949 if (attr->la_ctime < tmp_la->la_ctime)
950 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
951 else if (attr->la_valid == LA_CTIME &&
952 attr->la_ctime == tmp_la->la_ctime)
953 attr->la_valid &= ~LA_CTIME;
958 int mdd_attr_set_internal(const struct lu_env *env,
959 struct mdd_object *obj,
960 struct lu_attr *attr,
961 struct thandle *handle,
967 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
968 #ifdef CONFIG_FS_POSIX_ACL
969 if (!rc && (attr->la_valid & LA_MODE) && needacl)
970 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
975 int mdd_attr_check_set_internal(const struct lu_env *env,
976 struct mdd_object *obj,
977 struct lu_attr *attr,
978 struct thandle *handle,
984 rc = mdd_attr_check(env, obj, attr);
989 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
993 static int mdd_attr_set_internal_locked(const struct lu_env *env,
994 struct mdd_object *obj,
995 struct lu_attr *attr,
996 struct thandle *handle,
1002 needacl = needacl && (attr->la_valid & LA_MODE);
1004 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1005 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1007 mdd_write_unlock(env, obj);
1011 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1012 struct mdd_object *obj,
1013 struct lu_attr *attr,
1014 struct thandle *handle,
1020 needacl = needacl && (attr->la_valid & LA_MODE);
1022 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1023 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1025 mdd_write_unlock(env, obj);
1029 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1030 const struct lu_buf *buf, const char *name,
1031 int fl, struct thandle *handle)
1033 struct lustre_capa *capa = mdd_object_capa(env, obj);
1037 if (buf->lb_buf && buf->lb_len > 0)
1038 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1039 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1040 rc = mdo_xattr_del(env, obj, name, handle, capa);
1046 * This gives the same functionality as the code between
1047 * sys_chmod and inode_setattr
1048 * chown_common and inode_setattr
1049 * utimes and inode_setattr
1050 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1052 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1053 struct lu_attr *la, const struct md_attr *ma)
1055 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1056 struct md_ucred *uc;
1063 /* Do not permit change file type */
1064 if (la->la_valid & LA_TYPE)
1067 /* They should not be processed by setattr */
1068 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1071 /* export destroy does not have ->le_ses, but we may want
1072 * to drop LUSTRE_SOM_FL. */
1078 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1082 if (la->la_valid == LA_CTIME) {
1083 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1084 /* This is only for set ctime when rename's source is
1086 rc = mdd_may_delete(env, NULL, obj,
1087 (struct md_attr *)ma, 1, 0);
1088 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1089 la->la_valid &= ~LA_CTIME;
1093 if (la->la_valid == LA_ATIME) {
1094 /* This is atime only set for read atime update on close. */
1095 if (la->la_atime >= tmp_la->la_atime &&
1096 la->la_atime < (tmp_la->la_atime +
1097 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1098 la->la_valid &= ~LA_ATIME;
1102 /* Check if flags change. */
1103 if (la->la_valid & LA_FLAGS) {
1104 unsigned int oldflags = 0;
1105 unsigned int newflags = la->la_flags &
1106 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1108 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1109 !mdd_capable(uc, CFS_CAP_FOWNER))
1112 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1113 * only be changed by the relevant capability. */
1114 if (mdd_is_immutable(obj))
1115 oldflags |= LUSTRE_IMMUTABLE_FL;
1116 if (mdd_is_append(obj))
1117 oldflags |= LUSTRE_APPEND_FL;
1118 if ((oldflags ^ newflags) &&
1119 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1122 if (!S_ISDIR(tmp_la->la_mode))
1123 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1126 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1127 (la->la_valid & ~LA_FLAGS) &&
1128 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1131 /* Check for setting the obj time. */
1132 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1133 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1134 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1135 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1136 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1144 if (la->la_valid & LA_KILL_SUID) {
1145 la->la_valid &= ~LA_KILL_SUID;
1146 if ((tmp_la->la_mode & S_ISUID) &&
1147 !(la->la_valid & LA_MODE)) {
1148 la->la_mode = tmp_la->la_mode;
1149 la->la_valid |= LA_MODE;
1151 la->la_mode &= ~S_ISUID;
1154 if (la->la_valid & LA_KILL_SGID) {
1155 la->la_valid &= ~LA_KILL_SGID;
1156 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1157 (S_ISGID | S_IXGRP)) &&
1158 !(la->la_valid & LA_MODE)) {
1159 la->la_mode = tmp_la->la_mode;
1160 la->la_valid |= LA_MODE;
1162 la->la_mode &= ~S_ISGID;
1165 /* Make sure a caller can chmod. */
1166 if (la->la_valid & LA_MODE) {
1167 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1168 (uc->mu_fsuid != tmp_la->la_uid) &&
1169 !mdd_capable(uc, CFS_CAP_FOWNER))
1172 if (la->la_mode == (cfs_umode_t) -1)
1173 la->la_mode = tmp_la->la_mode;
1175 la->la_mode = (la->la_mode & S_IALLUGO) |
1176 (tmp_la->la_mode & ~S_IALLUGO);
1178 /* Also check the setgid bit! */
1179 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1180 la->la_gid : tmp_la->la_gid) &&
1181 !mdd_capable(uc, CFS_CAP_FSETID))
1182 la->la_mode &= ~S_ISGID;
1184 la->la_mode = tmp_la->la_mode;
1187 /* Make sure a caller can chown. */
1188 if (la->la_valid & LA_UID) {
1189 if (la->la_uid == (uid_t) -1)
1190 la->la_uid = tmp_la->la_uid;
1191 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1192 (la->la_uid != tmp_la->la_uid)) &&
1193 !mdd_capable(uc, CFS_CAP_CHOWN))
1196 /* If the user or group of a non-directory has been
1197 * changed by a non-root user, remove the setuid bit.
1198 * 19981026 David C Niemi <niemi@tux.org>
1200 * Changed this to apply to all users, including root,
1201 * to avoid some races. This is the behavior we had in
1202 * 2.0. The check for non-root was definitely wrong
1203 * for 2.2 anyway, as it should have been using
1204 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1205 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1206 !S_ISDIR(tmp_la->la_mode)) {
1207 la->la_mode &= ~S_ISUID;
1208 la->la_valid |= LA_MODE;
1212 /* Make sure caller can chgrp. */
1213 if (la->la_valid & LA_GID) {
1214 if (la->la_gid == (gid_t) -1)
1215 la->la_gid = tmp_la->la_gid;
1216 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1217 ((la->la_gid != tmp_la->la_gid) &&
1218 !lustre_in_group_p(uc, la->la_gid))) &&
1219 !mdd_capable(uc, CFS_CAP_CHOWN))
1222 /* Likewise, if the user or group of a non-directory
1223 * has been changed by a non-root user, remove the
1224 * setgid bit UNLESS there is no group execute bit
1225 * (this would be a file marked for mandatory
1226 * locking). 19981026 David C Niemi <niemi@tux.org>
1228 * Removed the fsuid check (see the comment above) --
1230 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1231 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1232 la->la_mode &= ~S_ISGID;
1233 la->la_valid |= LA_MODE;
1237 /* For both Size-on-MDS case and truncate case,
1238 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1239 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1240 * For SOM case, it is true, the MAY_WRITE perm has been checked
1241 * when open, no need check again. For truncate case, it is false,
1242 * the MAY_WRITE perm should be checked here. */
1243 if (ma->ma_attr_flags & MDS_SOM) {
1244 /* For the "Size-on-MDS" setattr update, merge coming
1245 * attributes with the set in the inode. BUG 10641 */
1246 if ((la->la_valid & LA_ATIME) &&
1247 (la->la_atime <= tmp_la->la_atime))
1248 la->la_valid &= ~LA_ATIME;
1250 /* OST attributes do not have a priority over MDS attributes,
1251 * so drop times if ctime is equal. */
1252 if ((la->la_valid & LA_CTIME) &&
1253 (la->la_ctime <= tmp_la->la_ctime))
1254 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1256 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1257 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1258 (uc->mu_fsuid == tmp_la->la_uid)) &&
1259 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1260 rc = mdd_permission_internal_locked(env, obj,
1267 if (la->la_valid & LA_CTIME) {
1268 /* The pure setattr, it has the priority over what is
1269 * already set, do not drop it if ctime is equal. */
1270 if (la->la_ctime < tmp_la->la_ctime)
1271 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1279 /** Store a data change changelog record
1280 * If this fails, we must fail the whole transaction; we don't
1281 * want the change to commit without the log entry.
1282 * \param mdd_obj - mdd_object of change
1283 * \param handle - transacion handle
1285 static int mdd_changelog_data_store(const struct lu_env *env,
1286 struct mdd_device *mdd,
1287 enum changelog_rec_type type,
1289 struct mdd_object *mdd_obj,
1290 struct thandle *handle)
1292 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1293 struct llog_changelog_rec *rec;
1294 struct thandle *th = NULL;
1300 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1302 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1305 LASSERT(mdd_obj != NULL);
1306 LASSERT(handle != NULL);
1308 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1309 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1310 /* Don't need multiple updates in this log */
1311 /* Don't check under lock - no big deal if we get an extra
1316 reclen = llog_data_len(sizeof(*rec));
1317 buf = mdd_buf_alloc(env, reclen);
1318 if (buf->lb_buf == NULL)
1320 rec = (struct llog_changelog_rec *)buf->lb_buf;
1322 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1323 rec->cr.cr_type = (__u32)type;
1324 rec->cr.cr_tfid = *tfid;
1325 rec->cr.cr_namelen = 0;
1326 mdd_obj->mod_cltime = cfs_time_current_64();
1328 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1331 mdd_trans_stop(env, mdd, rc, th);
1334 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1335 rc, type, PFID(tfid));
1342 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1343 int flags, struct md_object *obj)
1345 struct thandle *handle;
1346 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1347 struct mdd_device *mdd = mdo2mdd(obj);
1351 handle = mdd_trans_create(env, mdd);
1353 return(PTR_ERR(handle));
1355 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1359 rc = mdd_trans_start(env, mdd, handle);
1363 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1367 mdd_trans_stop(env, mdd, rc, handle);
1373 * Should be called with write lock held.
1375 * \see mdd_lma_set_locked().
1377 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1378 const struct md_attr *ma, struct thandle *handle)
1380 struct mdd_thread_info *info = mdd_env_info(env);
1382 struct lustre_mdt_attrs *lma =
1383 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1384 int lmasize = sizeof(struct lustre_mdt_attrs);
1389 /* Either HSM or SOM part is not valid, we need to read it before */
1390 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1391 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1395 lustre_lma_swab(lma);
1397 memset(lma, 0, lmasize);
1401 if (ma->ma_valid & MA_HSM) {
1402 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1403 lma->lma_compat |= LMAC_HSM;
1407 if (ma->ma_valid & MA_SOM) {
1408 LASSERT(ma->ma_som != NULL);
1409 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1410 lma->lma_compat &= ~LMAC_SOM;
1412 lma->lma_compat |= LMAC_SOM;
1413 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1414 lma->lma_som_size = ma->ma_som->msd_size;
1415 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1416 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1421 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1423 lustre_lma_swab(lma);
1424 buf = mdd_buf_get(env, lma, lmasize);
1425 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1431 * Save LMA extended attributes with data from \a ma.
1433 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1434 * not, LMA EA will be first read from disk, modified and write back.
1437 static int mdd_lma_set_locked(const struct lu_env *env,
1438 struct mdd_object *mdd_obj,
1439 const struct md_attr *ma, struct thandle *handle)
1443 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1444 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1445 mdd_write_unlock(env, mdd_obj);
1449 /* Precedence for choosing record type when multiple
1450 * attributes change: setattr > mtime > ctime > atime
1451 * (ctime changes when mtime does, plus chmod/chown.
1452 * atime and ctime are independent.) */
1453 static int mdd_attr_set_changelog(const struct lu_env *env,
1454 struct md_object *obj, struct thandle *handle,
1457 struct mdd_device *mdd = mdo2mdd(obj);
1460 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1461 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1462 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1463 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1464 bits = bits & mdd->mdd_cl.mc_mask;
1468 /* The record type is the lowest non-masked set bit */
1469 while (bits && ((bits & 1) == 0)) {
1474 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1475 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1476 md2mdd_obj(obj), handle);
1479 static int mdd_declare_attr_set(const struct lu_env *env,
1480 struct mdd_device *mdd,
1481 struct mdd_object *obj,
1482 const struct md_attr *ma,
1483 struct lov_mds_md *lmm,
1484 struct thandle *handle)
1486 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1489 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1493 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1497 if (ma->ma_valid & MA_LOV) {
1499 buf->lb_len = ma->ma_lmm_size;
1500 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1506 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1508 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1509 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1515 #ifdef CONFIG_FS_POSIX_ACL
1516 if (ma->ma_attr.la_valid & LA_MODE) {
1517 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1518 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1520 mdd_read_unlock(env, obj);
1521 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1529 rc = mdo_declare_xattr_set(env, obj, buf,
1530 XATTR_NAME_ACL_ACCESS, 0,
1538 /* basically the log is the same as in unlink case */
1542 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1543 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1544 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1545 mdd->mdd_obd_dev->obd_name,
1546 le32_to_cpu(lmm->lmm_magic),
1547 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1551 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1552 if (stripe == LOV_ALL_STRIPES) {
1553 struct lov_desc *ldesc;
1555 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1556 LASSERT(ldesc != NULL);
1557 stripe = ldesc->ld_tgt_count;
1560 for (i = 0; i < stripe; i++) {
1561 rc = mdd_declare_llog_record(env, mdd,
1562 sizeof(struct llog_unlink_rec),
1572 /* set attr and LOV EA at once, return updated attr */
1573 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1574 const struct md_attr *ma)
1576 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1577 struct mdd_device *mdd = mdo2mdd(obj);
1578 struct thandle *handle;
1579 struct lov_mds_md *lmm = NULL;
1580 struct llog_cookie *logcookies = NULL;
1581 int rc, lmm_size = 0, cookie_size = 0;
1582 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1583 struct obd_device *obd = mdd->mdd_obd_dev;
1584 struct mds_obd *mds = &obd->u.mds;
1585 #ifdef HAVE_QUOTA_SUPPORT
1586 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1587 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1588 int quota_opc = 0, block_count = 0;
1589 int inode_pending[MAXQUOTAS] = { 0, 0 };
1590 int block_pending[MAXQUOTAS] = { 0, 0 };
1594 *la_copy = ma->ma_attr;
1595 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1599 /* setattr on "close" only change atime, or do nothing */
1600 if (ma->ma_valid == MA_INODE &&
1601 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1604 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1605 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1606 lmm_size = mdd_lov_mdsize(env, mdd);
1607 lmm = mdd_max_lmm_get(env, mdd);
1611 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1618 handle = mdd_trans_create(env, mdd);
1620 RETURN(PTR_ERR(handle));
1622 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1623 lmm_size > 0 ? lmm : NULL, handle);
1627 /* permission changes may require sync operation */
1628 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1629 handle->th_sync = !!mdd->mdd_sync_permission;
1631 rc = mdd_trans_start(env, mdd, handle);
1635 /* permission changes may require sync operation */
1636 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1637 handle->th_sync |= mdd->mdd_sync_permission;
1639 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1640 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1641 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1643 #ifdef HAVE_QUOTA_SUPPORT
1644 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1645 struct obd_export *exp = md_quota(env)->mq_exp;
1646 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1648 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1650 quota_opc = FSFILT_OP_SETATTR;
1651 mdd_quota_wrapper(la_copy, qnids);
1652 mdd_quota_wrapper(la_tmp, qoids);
1653 /* get file quota for new owner */
1654 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1655 qnids, inode_pending, 1, NULL, 0,
1657 block_count = (la_tmp->la_blocks + 7) >> 3;
1660 mdd_data_get(env, mdd_obj, &data);
1661 /* get block quota for new owner */
1662 lquota_chkquota(mds_quota_interface_ref, obd,
1663 exp, qnids, block_pending,
1665 LQUOTA_FLAGS_BLK, data, 1);
1671 if (la_copy->la_valid & LA_FLAGS) {
1672 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1675 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1676 } else if (la_copy->la_valid) { /* setattr */
1677 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1679 /* journal chown/chgrp in llog, just like unlink */
1680 if (rc == 0 && lmm_size){
1681 cookie_size = mdd_lov_cookiesize(env, mdd);
1682 logcookies = mdd_max_cookie_get(env, mdd);
1683 if (logcookies == NULL)
1684 GOTO(cleanup, rc = -ENOMEM);
1686 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1687 logcookies, cookie_size) <= 0)
1692 if (rc == 0 && ma->ma_valid & MA_LOV) {
1695 mode = mdd_object_type(mdd_obj);
1696 if (S_ISREG(mode) || S_ISDIR(mode)) {
1697 rc = mdd_lsm_sanity_check(env, mdd_obj);
1701 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1702 ma->ma_lmm_size, handle, 1);
1706 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1709 mode = mdd_object_type(mdd_obj);
1711 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1716 rc = mdd_attr_set_changelog(env, obj, handle,
1717 ma->ma_attr.la_valid);
1719 mdd_trans_stop(env, mdd, rc, handle);
1720 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1721 /*set obd attr, if needed*/
1722 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1725 #ifdef HAVE_QUOTA_SUPPORT
1727 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1729 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1731 /* Trigger dqrel/dqacq for original owner and new owner.
1732 * If failed, the next call for lquota_chkquota will
1734 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1741 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1742 const struct lu_buf *buf, const char *name, int fl,
1743 struct thandle *handle)
1748 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1749 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1750 mdd_write_unlock(env, obj);
1755 static int mdd_xattr_sanity_check(const struct lu_env *env,
1756 struct mdd_object *obj)
1758 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1759 struct md_ucred *uc = md_ucred(env);
1763 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1766 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1770 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1771 !mdd_capable(uc, CFS_CAP_FOWNER))
1777 static int mdd_declare_xattr_set(const struct lu_env *env,
1778 struct mdd_device *mdd,
1779 struct mdd_object *obj,
1780 const struct lu_buf *buf,
1782 struct thandle *handle)
1787 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1791 /* Only record user xattr changes */
1792 if ((strncmp("user.", name, 5) == 0))
1793 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1799 * The caller should guarantee to update the object ctime
1800 * after xattr_set if needed.
1802 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1803 const struct lu_buf *buf, const char *name,
1806 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1807 struct mdd_device *mdd = mdo2mdd(obj);
1808 struct thandle *handle;
1812 rc = mdd_xattr_sanity_check(env, mdd_obj);
1816 handle = mdd_trans_create(env, mdd);
1818 RETURN(PTR_ERR(handle));
1820 /* security-replated changes may require sync */
1821 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1822 mdd->mdd_sync_permission == 1)
1823 handle->th_sync = 1;
1825 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1829 rc = mdd_trans_start(env, mdd, handle);
1833 /* security-replated changes may require sync */
1834 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1835 handle->th_sync |= mdd->mdd_sync_permission;
1837 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1839 /* Only record system & user xattr changes */
1840 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1841 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1842 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1843 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1844 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1845 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1846 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1850 mdd_trans_stop(env, mdd, rc, handle);
1855 static int mdd_declare_xattr_del(const struct lu_env *env,
1856 struct mdd_device *mdd,
1857 struct mdd_object *obj,
1859 struct thandle *handle)
1863 rc = mdo_declare_xattr_del(env, obj, name, handle);
1867 /* Only record user xattr changes */
1868 if ((strncmp("user.", name, 5) == 0))
1869 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1875 * The caller should guarantee to update the object ctime
1876 * after xattr_set if needed.
1878 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1881 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1882 struct mdd_device *mdd = mdo2mdd(obj);
1883 struct thandle *handle;
1887 rc = mdd_xattr_sanity_check(env, mdd_obj);
1891 handle = mdd_trans_create(env, mdd);
1893 RETURN(PTR_ERR(handle));
1895 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1899 rc = mdd_trans_start(env, mdd, handle);
1903 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1904 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1905 mdd_object_capa(env, mdd_obj));
1906 mdd_write_unlock(env, mdd_obj);
1908 /* Only record system & user xattr changes */
1909 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1910 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1911 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1912 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1913 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1914 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1915 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1919 mdd_trans_stop(env, mdd, rc, handle);
1924 /* partial unlink */
1925 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1928 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1929 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1930 struct mdd_device *mdd = mdo2mdd(obj);
1931 struct thandle *handle;
1932 #ifdef HAVE_QUOTA_SUPPORT
1933 struct obd_device *obd = mdd->mdd_obd_dev;
1934 struct mds_obd *mds = &obd->u.mds;
1935 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1941 /* XXX: this code won't be used ever:
1942 * DNE uses slightly different approach */
1946 * Check -ENOENT early here because we need to get object type
1947 * to calculate credits before transaction start
1949 if (!mdd_object_exists(mdd_obj))
1952 LASSERT(mdd_object_exists(mdd_obj) > 0);
1954 handle = mdd_trans_create(env, mdd);
1958 rc = mdd_trans_start(env, mdd, handle);
1960 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1962 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1966 mdo_ref_del(env, mdd_obj, handle);
1968 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1970 mdo_ref_del(env, mdd_obj, handle);
1973 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1974 la_copy->la_ctime = ma->ma_attr.la_ctime;
1976 la_copy->la_valid = LA_CTIME;
1977 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1981 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1982 #ifdef HAVE_QUOTA_SUPPORT
1983 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1984 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1985 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1986 mdd_quota_wrapper(&ma->ma_attr, qids);
1993 mdd_write_unlock(env, mdd_obj);
1994 mdd_trans_stop(env, mdd, rc, handle);
1995 #ifdef HAVE_QUOTA_SUPPORT
1997 /* Trigger dqrel on the owner of child. If failed,
1998 * the next call for lquota_chkquota will process it */
1999 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2005 /* partial operation */
2006 static int mdd_oc_sanity_check(const struct lu_env *env,
2007 struct mdd_object *obj,
2013 switch (ma->ma_attr.la_mode & S_IFMT) {
2030 static int mdd_object_create(const struct lu_env *env,
2031 struct md_object *obj,
2032 const struct md_op_spec *spec,
2036 struct mdd_device *mdd = mdo2mdd(obj);
2037 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2038 const struct lu_fid *pfid = spec->u.sp_pfid;
2039 struct thandle *handle;
2040 #ifdef HAVE_QUOTA_SUPPORT
2041 struct obd_device *obd = mdd->mdd_obd_dev;
2042 struct obd_export *exp = md_quota(env)->mq_exp;
2043 struct mds_obd *mds = &obd->u.mds;
2044 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2045 int quota_opc = 0, block_count = 0;
2046 int inode_pending[MAXQUOTAS] = { 0, 0 };
2047 int block_pending[MAXQUOTAS] = { 0, 0 };
2052 /* XXX: this code won't be used ever:
2053 * DNE uses slightly different approach */
2056 #ifdef HAVE_QUOTA_SUPPORT
2057 if (mds->mds_quota) {
2058 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2059 mdd_quota_wrapper(&ma->ma_attr, qids);
2060 /* get file quota for child */
2061 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2062 qids, inode_pending, 1, NULL, 0,
2064 switch (ma->ma_attr.la_mode & S_IFMT) {
2073 /* get block quota for child */
2075 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2076 qids, block_pending, block_count,
2077 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2081 handle = mdd_trans_create(env, mdd);
2083 GOTO(out_pending, rc = PTR_ERR(handle));
2085 rc = mdd_trans_start(env, mdd, handle);
2087 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2088 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2092 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2096 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2097 /* If creating the slave object, set slave EA here. */
2098 int lmv_size = spec->u.sp_ea.eadatalen;
2099 struct lmv_stripe_md *lmv;
2101 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2102 LASSERT(lmv != NULL && lmv_size > 0);
2104 rc = __mdd_xattr_set(env, mdd_obj,
2105 mdd_buf_get_const(env, lmv, lmv_size),
2106 XATTR_NAME_LMV, 0, handle);
2110 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2113 #ifdef CONFIG_FS_POSIX_ACL
2114 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2115 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2117 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2118 buf->lb_len = spec->u.sp_ea.eadatalen;
2119 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2120 rc = __mdd_acl_init(env, mdd_obj, buf,
2121 &ma->ma_attr.la_mode,
2126 ma->ma_attr.la_valid |= LA_MODE;
2129 pfid = spec->u.sp_ea.fid;
2132 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2138 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2139 mdd_write_unlock(env, mdd_obj);
2141 mdd_trans_stop(env, mdd, rc, handle);
2143 #ifdef HAVE_QUOTA_SUPPORT
2145 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2147 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2149 /* Trigger dqacq on the owner of child. If failed,
2150 * the next call for lquota_chkquota will process it. */
2151 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2159 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2160 const struct md_attr *ma)
2162 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2163 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2164 struct mdd_device *mdd = mdo2mdd(obj);
2165 struct thandle *handle;
2169 /* XXX: this code won't be used ever:
2170 * DNE uses slightly different approach */
2173 handle = mdd_trans_create(env, mdd);
2177 rc = mdd_trans_start(env, mdd, handle);
2179 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2180 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2182 mdo_ref_add(env, mdd_obj, handle);
2183 mdd_write_unlock(env, mdd_obj);
2185 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2186 la_copy->la_ctime = ma->ma_attr.la_ctime;
2188 la_copy->la_valid = LA_CTIME;
2189 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2192 mdd_trans_stop(env, mdd, 0, handle);
2198 * do NOT or the MAY_*'s, you'll get the weakest
2200 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2204 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2205 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2206 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2207 * owner can write to a file even if it is marked readonly to hide
2208 * its brokenness. (bug 5781) */
2209 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2210 struct md_ucred *uc = md_ucred(env);
2212 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2213 (la->la_uid == uc->mu_fsuid))
2217 if (flags & FMODE_READ)
2219 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2221 if (flags & MDS_FMODE_EXEC)
2226 static int mdd_open_sanity_check(const struct lu_env *env,
2227 struct mdd_object *obj, int flag)
2229 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2234 if (mdd_is_dead_obj(obj))
2237 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2241 if (S_ISLNK(tmp_la->la_mode))
2244 mode = accmode(env, tmp_la, flag);
2246 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2249 if (!(flag & MDS_OPEN_CREATED)) {
2250 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2255 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2256 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2257 flag &= ~MDS_OPEN_TRUNC;
2259 /* For writing append-only file must open it with append mode. */
2260 if (mdd_is_append(obj)) {
2261 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2263 if (flag & MDS_OPEN_TRUNC)
2269 * Now, flag -- O_NOATIME does not be packed by client.
2271 if (flag & O_NOATIME) {
2272 struct md_ucred *uc = md_ucred(env);
2274 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2275 (uc->mu_valid == UCRED_NEW)) &&
2276 (uc->mu_fsuid != tmp_la->la_uid) &&
2277 !mdd_capable(uc, CFS_CAP_FOWNER))
2285 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2288 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2291 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2293 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2295 mdd_obj->mod_count++;
2297 mdd_write_unlock(env, mdd_obj);
2301 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2302 struct md_attr *ma, struct thandle *handle)
2306 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2310 return mdo_declare_destroy(env, obj, handle);
2313 /* return md_attr back,
2314 * if it is last unlink then return lov ea + llog cookie*/
2315 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2316 struct md_attr *ma, struct thandle *handle)
2321 if (S_ISREG(mdd_object_type(obj))) {
2322 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2323 * Caller must be ready for that. */
2325 rc = __mdd_lmm_get(env, obj, ma);
2326 if ((ma->ma_valid & MA_LOV))
2327 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2332 rc = mdo_destroy(env, obj, handle);
2337 static int mdd_declare_close(const struct lu_env *env,
2338 struct mdd_object *obj,
2340 struct thandle *handle)
2344 rc = orph_declare_index_delete(env, obj, handle);
2348 return mdd_declare_object_kill(env, obj, ma, handle);
2352 * No permission check is needed.
2354 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2355 struct md_attr *ma, int mode)
2357 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2358 struct mdd_device *mdd = mdo2mdd(obj);
2359 struct thandle *handle = NULL;
2361 int is_orphan = 0, reset = 1;
2363 #ifdef HAVE_QUOTA_SUPPORT
2364 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2365 struct mds_obd *mds = &obd->u.mds;
2366 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2371 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2372 mdd_obj->mod_count--;
2374 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2375 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2376 "list\n", PFID(mdd_object_fid(mdd_obj)));
2380 /* check without any lock */
2381 if (mdd_obj->mod_count == 1 &&
2382 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2384 handle = mdd_trans_create(env, mdo2mdd(obj));
2386 RETURN(PTR_ERR(handle));
2388 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2392 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2396 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2401 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2402 if (handle == NULL && mdd_obj->mod_count == 1 &&
2403 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2404 mdd_write_unlock(env, mdd_obj);
2408 /* release open count */
2409 mdd_obj->mod_count --;
2411 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2412 /* remove link to object from orphan index */
2413 LASSERT(handle != NULL);
2414 rc = __mdd_orphan_del(env, mdd_obj, handle);
2416 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2417 "list, OSS objects to be destroyed.\n",
2418 PFID(mdd_object_fid(mdd_obj)));
2421 CERROR("Object "DFID" can not be deleted from orphan "
2422 "list, maybe cause OST objects can not be "
2423 "destroyed (err: %d).\n",
2424 PFID(mdd_object_fid(mdd_obj)), rc);
2425 /* If object was not deleted from orphan list, do not
2426 * destroy OSS objects, which will be done when next
2432 rc = mdd_iattr_get(env, mdd_obj, ma);
2433 /* Object maybe not in orphan list originally, it is rare case for
2434 * mdd_finish_unlink() failure. */
2435 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2436 #ifdef HAVE_QUOTA_SUPPORT
2437 if (mds->mds_quota) {
2438 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2439 mdd_quota_wrapper(&ma->ma_attr, qids);
2442 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2443 if (ma->ma_valid & MA_FLAGS &&
2444 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2445 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2447 if (handle == NULL) {
2448 handle = mdd_trans_create(env, mdo2mdd(obj));
2450 GOTO(out, rc = PTR_ERR(handle));
2452 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2457 rc = mdd_declare_changelog_store(env, mdd,
2462 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2467 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2473 CERROR("Error when prepare to delete Object "DFID" , "
2474 "which will cause OST objects can not be "
2475 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2481 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2483 mdd_write_unlock(env, mdd_obj);
2486 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2487 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2488 if (handle == NULL) {
2489 handle = mdd_trans_create(env, mdo2mdd(obj));
2491 GOTO(stop, rc = IS_ERR(handle));
2493 rc = mdd_declare_changelog_store(env, mdd, NULL,
2498 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2503 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2509 mdd_trans_stop(env, mdd, rc, handle);
2510 #ifdef HAVE_QUOTA_SUPPORT
2512 /* Trigger dqrel on the owner of child. If failed,
2513 * the next call for lquota_chkquota will process it */
2514 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2521 * Permission check is done when open,
2522 * no need check again.
2524 static int mdd_readpage_sanity_check(const struct lu_env *env,
2525 struct mdd_object *obj)
2527 struct dt_object *next = mdd_object_child(obj);
2531 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2539 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2540 struct lu_dirpage *dp, int nob,
2541 const struct dt_it_ops *iops, struct dt_it *it,
2547 struct lu_dirent *ent;
2548 struct lu_dirent *last = NULL;
2551 memset(area, 0, sizeof (*dp));
2552 area += sizeof (*dp);
2553 nob -= sizeof (*dp);
2560 len = iops->key_size(env, it);
2562 /* IAM iterator can return record with zero len. */
2566 hash = iops->store(env, it);
2567 if (unlikely(first)) {
2569 dp->ldp_hash_start = cpu_to_le64(hash);
2572 /* calculate max space required for lu_dirent */
2573 recsize = lu_dirent_calc_size(len, attr);
2575 if (nob >= recsize) {
2576 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2577 if (result == -ESTALE)
2582 /* osd might not able to pack all attributes,
2583 * so recheck rec length */
2584 recsize = le16_to_cpu(ent->lde_reclen);
2586 result = (last != NULL) ? 0 :-EINVAL;
2590 ent = (void *)ent + recsize;
2594 result = iops->next(env, it);
2595 if (result == -ESTALE)
2597 } while (result == 0);
2600 dp->ldp_hash_end = cpu_to_le64(hash);
2602 if (last->lde_hash == dp->ldp_hash_end)
2603 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2604 last->lde_reclen = 0; /* end mark */
2609 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2610 const struct lu_rdpg *rdpg)
2613 struct dt_object *next = mdd_object_child(obj);
2614 const struct dt_it_ops *iops;
2616 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2622 LASSERT(rdpg->rp_pages != NULL);
2623 LASSERT(next->do_index_ops != NULL);
2625 if (rdpg->rp_count <= 0)
2629 * iterate through directory and fill pages from @rdpg
2631 iops = &next->do_index_ops->dio_it;
2632 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2636 rc = iops->load(env, it, rdpg->rp_hash);
2640 * Iterator didn't find record with exactly the key requested.
2642 * It is currently either
2644 * - positioned above record with key less than
2645 * requested---skip it.
2647 * - or not positioned at all (is in IAM_IT_SKEWED
2648 * state)---position it on the next item.
2650 rc = iops->next(env, it);
2655 * At this point and across for-loop:
2657 * rc == 0 -> ok, proceed.
2658 * rc > 0 -> end of directory.
2661 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2662 i++, nob -= CFS_PAGE_SIZE) {
2663 struct lu_dirpage *dp;
2665 LASSERT(i < rdpg->rp_npages);
2666 pg = rdpg->rp_pages[i];
2668 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2671 rc = mdd_dir_page_build(env, mdd, dp,
2672 min_t(int, nob, LU_PAGE_SIZE),
2673 iops, it, rdpg->rp_attrs);
2678 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2680 } else if (rc < 0) {
2681 CWARN("build page failed: %d!\n", rc);
2684 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2685 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2686 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2693 struct lu_dirpage *dp;
2695 dp = cfs_kmap(rdpg->rp_pages[0]);
2696 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2699 * No pages were processed, mark this for first page
2702 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2705 cfs_kunmap(rdpg->rp_pages[0]);
2707 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2710 iops->fini(env, it);
2715 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2716 const struct lu_rdpg *rdpg)
2718 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2722 LASSERT(mdd_object_exists(mdd_obj));
2724 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2725 rc = mdd_readpage_sanity_check(env, mdd_obj);
2727 GOTO(out_unlock, rc);
2729 if (mdd_is_dead_obj(mdd_obj)) {
2731 struct lu_dirpage *dp;
2734 * According to POSIX, please do not return any entry to client:
2735 * even dot and dotdot should not be returned.
2737 CWARN("readdir from dead object: "DFID"\n",
2738 PFID(mdd_object_fid(mdd_obj)));
2740 if (rdpg->rp_count <= 0)
2741 GOTO(out_unlock, rc = -EFAULT);
2742 LASSERT(rdpg->rp_pages != NULL);
2744 pg = rdpg->rp_pages[0];
2745 dp = (struct lu_dirpage*)cfs_kmap(pg);
2746 memset(dp, 0 , sizeof(struct lu_dirpage));
2747 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2748 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2749 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2751 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2754 rc = __mdd_readpage(env, mdd_obj, rdpg);
2758 mdd_read_unlock(env, mdd_obj);
2762 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2764 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2765 struct dt_object *next;
2767 LASSERT(mdd_object_exists(mdd_obj));
2768 next = mdd_object_child(mdd_obj);
2769 return next->do_ops->do_object_sync(env, next);
2772 const struct md_object_operations mdd_obj_ops = {
2773 .moo_permission = mdd_permission,
2774 .moo_attr_get = mdd_attr_get,
2775 .moo_attr_set = mdd_attr_set,
2776 .moo_xattr_get = mdd_xattr_get,
2777 .moo_xattr_set = mdd_xattr_set,
2778 .moo_xattr_list = mdd_xattr_list,
2779 .moo_xattr_del = mdd_xattr_del,
2780 .moo_object_create = mdd_object_create,
2781 .moo_ref_add = mdd_ref_add,
2782 .moo_ref_del = mdd_ref_del,
2783 .moo_open = mdd_open,
2784 .moo_close = mdd_close,
2785 .moo_readpage = mdd_readpage,
2786 .moo_readlink = mdd_readlink,
2787 .moo_changelog = mdd_changelog,
2788 .moo_capa_get = mdd_capa_get,
2789 .moo_object_sync = mdd_object_sync,
2790 .moo_path = mdd_path,
2791 .moo_file_lock = mdd_file_lock,
2792 .moo_file_unlock = mdd_file_unlock,