1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/mdd/mdd_object.c
40 * Lustre Metadata Server (mdd) routines
42 * Author: Wang Di <wangdi@clusterfs.com>
46 # define EXPORT_SYMTAB
48 #define DEBUG_SUBSYSTEM S_MDS
50 #include <linux/module.h>
52 #include <obd_class.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55 /* fid_be_cpu(), fid_cpu_to_be(). */
56 #include <lustre_fid.h>
59 #include <lustre_param.h>
60 #include <lustre_mds.h>
61 #include <lustre/lustre_idl.h>
63 #include "mdd_internal.h"
65 static const struct lu_object_operations mdd_lu_obj_ops;
67 static int mdd_xattr_get(const struct lu_env *env,
68 struct md_object *obj, struct lu_buf *buf,
71 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
74 if (mdd_object_exists(obj) == 0) {
75 CERROR("%s: object "DFID" not found: rc = -2\n",
76 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
79 mdo_data_get(env, obj, data);
83 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
84 struct lu_attr *la, struct lustre_capa *capa)
86 if (mdd_object_exists(obj) == 0) {
87 CERROR("%s: object "DFID" not found: rc = -2\n",
88 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
91 return mdo_attr_get(env, obj, la, capa);
94 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98 if (flags & LUSTRE_APPEND_FL)
99 obj->mod_flags |= APPEND_OBJ;
101 if (flags & LUSTRE_IMMUTABLE_FL)
102 obj->mod_flags |= IMMUTE_OBJ;
105 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 struct mdd_thread_info *info;
109 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
110 LASSERT(info != NULL);
114 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
118 buf = &mdd_env_info(env)->mti_buf;
124 void mdd_buf_put(struct lu_buf *buf)
126 if (buf == NULL || buf->lb_buf == NULL)
128 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
133 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
134 const void *area, ssize_t len)
138 buf = &mdd_env_info(env)->mti_buf;
139 buf->lb_buf = (void *)area;
144 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
146 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
148 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
149 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
152 if (buf->lb_buf == NULL) {
154 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
155 if (buf->lb_buf == NULL)
161 /** Increase the size of the \a mti_big_buf.
162 * preserves old data in buffer
163 * old buffer remains unchanged on error
164 * \retval 0 or -ENOMEM
166 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
168 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
171 LASSERT(len >= oldbuf->lb_len);
172 OBD_ALLOC_LARGE(buf.lb_buf, len);
174 if (buf.lb_buf == NULL)
178 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
180 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
182 memcpy(oldbuf, &buf, sizeof(buf));
187 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
188 struct mdd_device *mdd)
190 struct mdd_thread_info *mti = mdd_env_info(env);
193 max_cookie_size = mdd_lov_cookiesize(env, mdd);
194 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
195 if (mti->mti_max_cookie)
196 OBD_FREE_LARGE(mti->mti_max_cookie,
197 mti->mti_max_cookie_size);
198 mti->mti_max_cookie = NULL;
199 mti->mti_max_cookie_size = 0;
201 if (unlikely(mti->mti_max_cookie == NULL)) {
202 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
203 if (likely(mti->mti_max_cookie != NULL))
204 mti->mti_max_cookie_size = max_cookie_size;
206 if (likely(mti->mti_max_cookie != NULL))
207 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
208 return mti->mti_max_cookie;
211 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
212 struct mdd_device *mdd)
214 struct mdd_thread_info *mti = mdd_env_info(env);
217 max_lmm_size = mdd_lov_mdsize(env, mdd);
218 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
219 if (mti->mti_max_lmm)
220 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
221 mti->mti_max_lmm = NULL;
222 mti->mti_max_lmm_size = 0;
224 if (unlikely(mti->mti_max_lmm == NULL)) {
225 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
226 if (likely(mti->mti_max_lmm != NULL))
227 mti->mti_max_lmm_size = max_lmm_size;
229 return mti->mti_max_lmm;
232 struct lu_object *mdd_object_alloc(const struct lu_env *env,
233 const struct lu_object_header *hdr,
236 struct mdd_object *mdd_obj;
238 OBD_ALLOC_PTR(mdd_obj);
239 if (mdd_obj != NULL) {
242 o = mdd2lu_obj(mdd_obj);
243 lu_object_init(o, NULL, d);
244 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
245 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
246 mdd_obj->mod_count = 0;
247 o->lo_ops = &mdd_lu_obj_ops;
254 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
255 const struct lu_object_conf *unused)
257 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
258 struct mdd_object *mdd_obj = lu2mdd_obj(o);
259 struct lu_object *below;
260 struct lu_device *under;
263 mdd_obj->mod_cltime = 0;
264 under = &d->mdd_child->dd_lu_dev;
265 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
266 mdd_pdlock_init(mdd_obj);
270 lu_object_add(o, below);
275 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
277 if (lu_object_exists(o))
278 return mdd_get_flags(env, lu2mdd_obj(o));
283 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
285 struct mdd_object *mdd = lu2mdd_obj(o);
291 static int mdd_object_print(const struct lu_env *env, void *cookie,
292 lu_printer_t p, const struct lu_object *o)
294 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
295 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
296 "valid=%x, cltime="LPU64", flags=%lx)",
297 mdd, mdd->mod_count, mdd->mod_valid,
298 mdd->mod_cltime, mdd->mod_flags);
301 static const struct lu_object_operations mdd_lu_obj_ops = {
302 .loo_object_init = mdd_object_init,
303 .loo_object_start = mdd_object_start,
304 .loo_object_free = mdd_object_free,
305 .loo_object_print = mdd_object_print,
308 struct mdd_object *mdd_object_find(const struct lu_env *env,
309 struct mdd_device *d,
310 const struct lu_fid *f)
312 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
315 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
316 const char *path, struct lu_fid *fid)
319 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
320 struct mdd_object *obj;
321 struct lu_name *lname = &mdd_env_info(env)->mti_name;
326 /* temp buffer for path element */
327 buf = mdd_buf_alloc(env, PATH_MAX);
328 if (buf->lb_buf == NULL)
331 lname->ln_name = name = buf->lb_buf;
332 lname->ln_namelen = 0;
333 *f = mdd->mdd_root_fid;
340 while (*path != '/' && *path != '\0') {
348 /* find obj corresponding to fid */
349 obj = mdd_object_find(env, mdd, f);
351 GOTO(out, rc = -EREMOTE);
353 GOTO(out, rc = PTR_ERR(obj));
354 /* get child fid from parent and name */
355 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
356 mdd_object_put(env, obj);
361 lname->ln_namelen = 0;
370 /** The maximum depth that fid2path() will search.
371 * This is limited only because we want to store the fids for
372 * historical path lookup purposes.
374 #define MAX_PATH_DEPTH 100
376 /** mdd_path() lookup structure. */
377 struct path_lookup_info {
378 __u64 pli_recno; /**< history point */
379 __u64 pli_currec; /**< current record */
380 struct lu_fid pli_fid;
381 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
382 struct mdd_object *pli_mdd_obj;
383 char *pli_path; /**< full path */
385 int pli_linkno; /**< which hardlink to follow */
386 int pli_fidcount; /**< number of \a pli_fids */
389 static int mdd_path_current(const struct lu_env *env,
390 struct path_lookup_info *pli)
392 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
393 struct mdd_object *mdd_obj;
394 struct lu_buf *buf = NULL;
395 struct link_ea_header *leh;
396 struct link_ea_entry *lee;
397 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
398 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
404 ptr = pli->pli_path + pli->pli_pathlen - 1;
407 pli->pli_fidcount = 0;
408 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
410 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
411 mdd_obj = mdd_object_find(env, mdd,
412 &pli->pli_fids[pli->pli_fidcount]);
414 GOTO(out, rc = -EREMOTE);
416 GOTO(out, rc = PTR_ERR(mdd_obj));
417 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
419 mdd_object_put(env, mdd_obj);
423 /* Do I need to error out here? */
428 /* Get parent fid and object name */
429 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
430 buf = mdd_links_get(env, mdd_obj);
431 mdd_read_unlock(env, mdd_obj);
432 mdd_object_put(env, mdd_obj);
434 GOTO(out, rc = PTR_ERR(buf));
437 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
438 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
440 /* If set, use link #linkno for path lookup, otherwise use
441 link #0. Only do this for the final path element. */
442 if ((pli->pli_fidcount == 0) &&
443 (pli->pli_linkno < leh->leh_reccount)) {
445 for (count = 0; count < pli->pli_linkno; count++) {
446 lee = (struct link_ea_entry *)
447 ((char *)lee + reclen);
448 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
450 if (pli->pli_linkno < leh->leh_reccount - 1)
451 /* indicate to user there are more links */
455 /* Pack the name in the end of the buffer */
456 ptr -= tmpname->ln_namelen;
457 if (ptr - 1 <= pli->pli_path)
458 GOTO(out, rc = -EOVERFLOW);
459 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
462 /* Store the parent fid for historic lookup */
463 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
464 GOTO(out, rc = -EOVERFLOW);
465 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
468 /* Verify that our path hasn't changed since we started the lookup.
469 Record the current index, and verify the path resolves to the
470 same fid. If it does, then the path is correct as of this index. */
471 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
472 pli->pli_currec = mdd->mdd_cl.mc_index;
473 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
474 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
476 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
477 GOTO (out, rc = -EAGAIN);
479 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
480 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
481 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
482 PFID(&pli->pli_fid));
483 GOTO(out, rc = -EAGAIN);
485 ptr++; /* skip leading / */
486 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
490 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
491 /* if we vmalloced a large buffer drop it */
497 static int mdd_path_historic(const struct lu_env *env,
498 struct path_lookup_info *pli)
503 /* Returns the full path to this fid, as of changelog record recno. */
504 static int mdd_path(const struct lu_env *env, struct md_object *obj,
505 char *path, int pathlen, __u64 *recno, int *linkno)
507 struct path_lookup_info *pli;
515 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
524 pli->pli_mdd_obj = md2mdd_obj(obj);
525 pli->pli_recno = *recno;
526 pli->pli_path = path;
527 pli->pli_pathlen = pathlen;
528 pli->pli_linkno = *linkno;
530 /* Retry multiple times in case file is being moved */
531 while (tries-- && rc == -EAGAIN)
532 rc = mdd_path_current(env, pli);
534 /* For historical path lookup, the current links may not have existed
535 * at "recno" time. We must switch over to earlier links/parents
536 * by using the changelog records. If the earlier parent doesn't
537 * exist, we must search back through the changelog to reconstruct
538 * its parents, then check if it exists, etc.
539 * We may ignore this problem for the initial implementation and
540 * state that an "original" hardlink must still exist for us to find
541 * historic path name. */
542 if (pli->pli_recno != -1) {
543 rc = mdd_path_historic(env, pli);
545 *recno = pli->pli_currec;
546 /* Return next link index to caller */
547 *linkno = pli->pli_linkno;
555 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
557 struct lu_attr *la = &mdd_env_info(env)->mti_la;
561 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
563 mdd_flags_xlate(obj, la->la_flags);
568 /* get only inode attributes */
569 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
575 if (ma->ma_valid & MA_INODE)
578 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
579 mdd_object_capa(env, mdd_obj));
581 ma->ma_valid |= MA_INODE;
585 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
587 struct lov_desc *ldesc;
588 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
589 struct lov_user_md *lum = (struct lov_user_md*)lmm;
595 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
596 LASSERT(ldesc != NULL);
598 lum->lmm_magic = LOV_MAGIC_V1;
599 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
600 lum->lmm_pattern = ldesc->ld_pattern;
601 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
602 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
603 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
605 RETURN(sizeof(*lum));
608 static int is_rootdir(struct mdd_object *mdd_obj)
610 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
611 const struct lu_fid *fid = mdo2fid(mdd_obj);
613 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
616 /* get lov EA only */
617 static int __mdd_lmm_get(const struct lu_env *env,
618 struct mdd_object *mdd_obj, struct md_attr *ma)
623 if (ma->ma_valid & MA_LOV)
626 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
628 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
629 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
631 ma->ma_lmm_size = rc;
632 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
633 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
639 /* get the first parent fid from link EA */
640 static int mdd_pfid_get(const struct lu_env *env,
641 struct mdd_object *mdd_obj, struct md_attr *ma)
644 struct link_ea_header *leh;
645 struct link_ea_entry *lee;
646 struct lu_fid *pfid = &ma->ma_pfid;
649 if (ma->ma_valid & MA_PFID)
652 buf = mdd_links_get(env, mdd_obj);
654 RETURN(PTR_ERR(buf));
657 lee = (struct link_ea_entry *)(leh + 1);
658 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
659 fid_be_to_cpu(pfid, pfid);
660 ma->ma_valid |= MA_PFID;
661 if (buf->lb_len > OBD_ALLOC_BIG)
662 /* if we vmalloced a large buffer drop it */
667 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
673 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
674 rc = __mdd_lmm_get(env, mdd_obj, ma);
675 mdd_read_unlock(env, mdd_obj);
680 static int __mdd_lmv_get(const struct lu_env *env,
681 struct mdd_object *mdd_obj, struct md_attr *ma)
686 if (ma->ma_valid & MA_LMV)
689 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
692 ma->ma_valid |= MA_LMV;
698 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
701 struct mdd_thread_info *info = mdd_env_info(env);
702 struct lustre_mdt_attrs *lma =
703 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
708 /* If all needed data are already valid, nothing to do */
709 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
710 (ma->ma_need & (MA_HSM | MA_SOM)))
713 /* Read LMA from disk EA */
714 lma_size = sizeof(info->mti_xattr_buf);
715 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
719 /* Useless to check LMA incompatibility because this is already done in
720 * osd_ea_fid_get(), and this will fail long before this code is
722 * So, if we are here, LMA is compatible.
725 lustre_lma_swab(lma);
727 /* Swab and copy LMA */
728 if (ma->ma_need & MA_HSM) {
729 if (lma->lma_compat & LMAC_HSM)
730 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
732 ma->ma_hsm.mh_flags = 0;
733 ma->ma_valid |= MA_HSM;
737 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
738 LASSERT(ma->ma_som != NULL);
739 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
740 ma->ma_som->msd_size = lma->lma_som_size;
741 ma->ma_som->msd_blocks = lma->lma_som_blocks;
742 ma->ma_som->msd_mountid = lma->lma_som_mountid;
743 ma->ma_valid |= MA_SOM;
749 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
755 if (ma->ma_need & MA_INODE)
756 rc = mdd_iattr_get(env, mdd_obj, ma);
758 if (rc == 0 && ma->ma_need & MA_LOV) {
759 if (S_ISREG(mdd_object_type(mdd_obj)) ||
760 S_ISDIR(mdd_object_type(mdd_obj)))
761 rc = __mdd_lmm_get(env, mdd_obj, ma);
763 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
764 if (S_ISREG(mdd_object_type(mdd_obj)))
765 rc = mdd_pfid_get(env, mdd_obj, ma);
767 if (rc == 0 && ma->ma_need & MA_LMV) {
768 if (S_ISDIR(mdd_object_type(mdd_obj)))
769 rc = __mdd_lmv_get(env, mdd_obj, ma);
771 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
772 if (S_ISREG(mdd_object_type(mdd_obj)))
773 rc = __mdd_lma_get(env, mdd_obj, ma);
775 #ifdef CONFIG_FS_POSIX_ACL
776 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
777 if (S_ISDIR(mdd_object_type(mdd_obj)))
778 rc = mdd_def_acl_get(env, mdd_obj, ma);
781 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
782 rc, ma->ma_valid, ma->ma_lmm);
786 int mdd_attr_get_internal_locked(const struct lu_env *env,
787 struct mdd_object *mdd_obj, struct md_attr *ma)
790 int needlock = ma->ma_need &
791 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
794 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
795 rc = mdd_attr_get_internal(env, mdd_obj, ma);
797 mdd_read_unlock(env, mdd_obj);
802 * No permission check is needed.
804 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
807 struct mdd_object *mdd_obj = md2mdd_obj(obj);
811 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
816 * No permission check is needed.
818 static int mdd_xattr_get(const struct lu_env *env,
819 struct md_object *obj, struct lu_buf *buf,
822 struct mdd_object *mdd_obj = md2mdd_obj(obj);
827 if (mdd_object_exists(mdd_obj) == 0) {
828 CERROR("%s: object "DFID" not found: rc = -2\n",
829 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
833 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
834 rc = mdo_xattr_get(env, mdd_obj, buf, name,
835 mdd_object_capa(env, mdd_obj));
836 mdd_read_unlock(env, mdd_obj);
842 * Permission check is done when open,
843 * no need check again.
845 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
848 struct mdd_object *mdd_obj = md2mdd_obj(obj);
849 struct dt_object *next;
854 if (mdd_object_exists(mdd_obj) == 0) {
855 CERROR("%s: object "DFID" not found: rc = -2\n",
856 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
860 next = mdd_object_child(mdd_obj);
861 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
862 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
863 mdd_object_capa(env, mdd_obj));
864 mdd_read_unlock(env, mdd_obj);
869 * No permission check is needed.
871 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
874 struct mdd_object *mdd_obj = md2mdd_obj(obj);
879 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
880 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
881 mdd_read_unlock(env, mdd_obj);
886 int mdd_declare_object_create_internal(const struct lu_env *env,
887 struct mdd_object *p,
888 struct mdd_object *c,
890 struct thandle *handle,
891 const struct md_op_spec *spec)
893 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
894 const struct dt_index_features *feat = spec->sp_feat;
898 if (feat != &dt_directory_features && feat != NULL)
899 dof->dof_type = DFT_INDEX;
901 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
903 dof->u.dof_idx.di_feat = feat;
905 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
910 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
911 struct mdd_object *c, struct md_attr *ma,
912 struct thandle *handle,
913 const struct md_op_spec *spec)
915 struct lu_attr *attr = &ma->ma_attr;
916 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
917 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
918 const struct dt_index_features *feat = spec->sp_feat;
922 if (!mdd_object_exists(c)) {
923 struct dt_object *next = mdd_object_child(c);
926 if (feat != &dt_directory_features && feat != NULL)
927 dof->dof_type = DFT_INDEX;
929 dof->dof_type = dt_mode_to_dft(attr->la_mode);
931 dof->u.dof_idx.di_feat = feat;
933 /* @hint will be initialized by underlying device. */
934 next->do_ops->do_ah_init(env, hint,
935 p ? mdd_object_child(p) : NULL,
936 attr->la_mode & S_IFMT);
938 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
939 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
947 * Make sure the ctime is increased only.
949 static inline int mdd_attr_check(const struct lu_env *env,
950 struct mdd_object *obj,
951 struct lu_attr *attr)
953 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
957 if (attr->la_valid & LA_CTIME) {
958 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
962 if (attr->la_ctime < tmp_la->la_ctime)
963 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
964 else if (attr->la_valid == LA_CTIME &&
965 attr->la_ctime == tmp_la->la_ctime)
966 attr->la_valid &= ~LA_CTIME;
971 int mdd_attr_set_internal(const struct lu_env *env,
972 struct mdd_object *obj,
973 struct lu_attr *attr,
974 struct thandle *handle,
980 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
981 #ifdef CONFIG_FS_POSIX_ACL
982 if (!rc && (attr->la_valid & LA_MODE) && needacl)
983 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
988 int mdd_attr_check_set_internal(const struct lu_env *env,
989 struct mdd_object *obj,
990 struct lu_attr *attr,
991 struct thandle *handle,
997 rc = mdd_attr_check(env, obj, attr);
1002 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1006 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1007 struct mdd_object *obj,
1008 struct lu_attr *attr,
1009 struct thandle *handle,
1015 needacl = needacl && (attr->la_valid & LA_MODE);
1017 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1018 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1020 mdd_write_unlock(env, obj);
1024 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1025 struct mdd_object *obj,
1026 struct lu_attr *attr,
1027 struct thandle *handle,
1033 needacl = needacl && (attr->la_valid & LA_MODE);
1035 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1036 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1038 mdd_write_unlock(env, obj);
1042 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1043 const struct lu_buf *buf, const char *name,
1044 int fl, struct thandle *handle)
1046 struct lustre_capa *capa = mdd_object_capa(env, obj);
1050 if (buf->lb_buf && buf->lb_len > 0)
1051 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1052 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1053 rc = mdo_xattr_del(env, obj, name, handle, capa);
1059 * This gives the same functionality as the code between
1060 * sys_chmod and inode_setattr
1061 * chown_common and inode_setattr
1062 * utimes and inode_setattr
1063 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1065 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1066 struct lu_attr *la, const struct md_attr *ma)
1068 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1069 struct md_ucred *uc;
1076 /* Do not permit change file type */
1077 if (la->la_valid & LA_TYPE)
1080 /* They should not be processed by setattr */
1081 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1084 /* export destroy does not have ->le_ses, but we may want
1085 * to drop LUSTRE_SOM_FL. */
1091 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1095 if (la->la_valid == LA_CTIME) {
1096 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1097 /* This is only for set ctime when rename's source is
1099 rc = mdd_may_delete(env, NULL, obj,
1100 (struct md_attr *)ma, 1, 0);
1101 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1102 la->la_valid &= ~LA_CTIME;
1106 if (la->la_valid == LA_ATIME) {
1107 /* This is atime only set for read atime update on close. */
1108 if (la->la_atime >= tmp_la->la_atime &&
1109 la->la_atime < (tmp_la->la_atime +
1110 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1111 la->la_valid &= ~LA_ATIME;
1115 /* Check if flags change. */
1116 if (la->la_valid & LA_FLAGS) {
1117 unsigned int oldflags = 0;
1118 unsigned int newflags = la->la_flags &
1119 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1121 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1122 !mdd_capable(uc, CFS_CAP_FOWNER))
1125 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1126 * only be changed by the relevant capability. */
1127 if (mdd_is_immutable(obj))
1128 oldflags |= LUSTRE_IMMUTABLE_FL;
1129 if (mdd_is_append(obj))
1130 oldflags |= LUSTRE_APPEND_FL;
1131 if ((oldflags ^ newflags) &&
1132 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1135 if (!S_ISDIR(tmp_la->la_mode))
1136 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1139 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1140 (la->la_valid & ~LA_FLAGS) &&
1141 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1144 /* Check for setting the obj time. */
1145 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1146 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1147 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1148 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1149 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1157 if (la->la_valid & LA_KILL_SUID) {
1158 la->la_valid &= ~LA_KILL_SUID;
1159 if ((tmp_la->la_mode & S_ISUID) &&
1160 !(la->la_valid & LA_MODE)) {
1161 la->la_mode = tmp_la->la_mode;
1162 la->la_valid |= LA_MODE;
1164 la->la_mode &= ~S_ISUID;
1167 if (la->la_valid & LA_KILL_SGID) {
1168 la->la_valid &= ~LA_KILL_SGID;
1169 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1170 (S_ISGID | S_IXGRP)) &&
1171 !(la->la_valid & LA_MODE)) {
1172 la->la_mode = tmp_la->la_mode;
1173 la->la_valid |= LA_MODE;
1175 la->la_mode &= ~S_ISGID;
1178 /* Make sure a caller can chmod. */
1179 if (la->la_valid & LA_MODE) {
1180 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1181 (uc->mu_fsuid != tmp_la->la_uid) &&
1182 !mdd_capable(uc, CFS_CAP_FOWNER))
1185 if (la->la_mode == (cfs_umode_t) -1)
1186 la->la_mode = tmp_la->la_mode;
1188 la->la_mode = (la->la_mode & S_IALLUGO) |
1189 (tmp_la->la_mode & ~S_IALLUGO);
1191 /* Also check the setgid bit! */
1192 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1193 la->la_gid : tmp_la->la_gid) &&
1194 !mdd_capable(uc, CFS_CAP_FSETID))
1195 la->la_mode &= ~S_ISGID;
1197 la->la_mode = tmp_la->la_mode;
1200 /* Make sure a caller can chown. */
1201 if (la->la_valid & LA_UID) {
1202 if (la->la_uid == (uid_t) -1)
1203 la->la_uid = tmp_la->la_uid;
1204 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1205 (la->la_uid != tmp_la->la_uid)) &&
1206 !mdd_capable(uc, CFS_CAP_CHOWN))
1209 /* If the user or group of a non-directory has been
1210 * changed by a non-root user, remove the setuid bit.
1211 * 19981026 David C Niemi <niemi@tux.org>
1213 * Changed this to apply to all users, including root,
1214 * to avoid some races. This is the behavior we had in
1215 * 2.0. The check for non-root was definitely wrong
1216 * for 2.2 anyway, as it should have been using
1217 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1218 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1219 !S_ISDIR(tmp_la->la_mode)) {
1220 la->la_mode &= ~S_ISUID;
1221 la->la_valid |= LA_MODE;
1225 /* Make sure caller can chgrp. */
1226 if (la->la_valid & LA_GID) {
1227 if (la->la_gid == (gid_t) -1)
1228 la->la_gid = tmp_la->la_gid;
1229 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1230 ((la->la_gid != tmp_la->la_gid) &&
1231 !lustre_in_group_p(uc, la->la_gid))) &&
1232 !mdd_capable(uc, CFS_CAP_CHOWN))
1235 /* Likewise, if the user or group of a non-directory
1236 * has been changed by a non-root user, remove the
1237 * setgid bit UNLESS there is no group execute bit
1238 * (this would be a file marked for mandatory
1239 * locking). 19981026 David C Niemi <niemi@tux.org>
1241 * Removed the fsuid check (see the comment above) --
1243 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1244 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1245 la->la_mode &= ~S_ISGID;
1246 la->la_valid |= LA_MODE;
1250 /* For both Size-on-MDS case and truncate case,
1251 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1252 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1253 * For SOM case, it is true, the MAY_WRITE perm has been checked
1254 * when open, no need check again. For truncate case, it is false,
1255 * the MAY_WRITE perm should be checked here. */
1256 if (ma->ma_attr_flags & MDS_SOM) {
1257 /* For the "Size-on-MDS" setattr update, merge coming
1258 * attributes with the set in the inode. BUG 10641 */
1259 if ((la->la_valid & LA_ATIME) &&
1260 (la->la_atime <= tmp_la->la_atime))
1261 la->la_valid &= ~LA_ATIME;
1263 /* OST attributes do not have a priority over MDS attributes,
1264 * so drop times if ctime is equal. */
1265 if ((la->la_valid & LA_CTIME) &&
1266 (la->la_ctime <= tmp_la->la_ctime))
1267 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1269 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1270 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1271 (uc->mu_fsuid == tmp_la->la_uid)) &&
1272 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1273 rc = mdd_permission_internal_locked(env, obj,
1280 if (la->la_valid & LA_CTIME) {
1281 /* The pure setattr, it has the priority over what is
1282 * already set, do not drop it if ctime is equal. */
1283 if (la->la_ctime < tmp_la->la_ctime)
1284 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1292 /** Store a data change changelog record
1293 * If this fails, we must fail the whole transaction; we don't
1294 * want the change to commit without the log entry.
1295 * \param mdd_obj - mdd_object of change
1296 * \param handle - transacion handle
1298 static int mdd_changelog_data_store(const struct lu_env *env,
1299 struct mdd_device *mdd,
1300 enum changelog_rec_type type,
1302 struct mdd_object *mdd_obj,
1303 struct thandle *handle)
1305 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1306 struct llog_changelog_rec *rec;
1307 struct thandle *th = NULL;
1313 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1315 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1318 LASSERT(mdd_obj != NULL);
1319 LASSERT(handle != NULL);
1321 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1322 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1323 /* Don't need multiple updates in this log */
1324 /* Don't check under lock - no big deal if we get an extra
1329 reclen = llog_data_len(sizeof(*rec));
1330 buf = mdd_buf_alloc(env, reclen);
1331 if (buf->lb_buf == NULL)
1333 rec = (struct llog_changelog_rec *)buf->lb_buf;
1335 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1336 rec->cr.cr_type = (__u32)type;
1337 rec->cr.cr_tfid = *tfid;
1338 rec->cr.cr_namelen = 0;
1339 mdd_obj->mod_cltime = cfs_time_current_64();
1341 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1344 mdd_trans_stop(env, mdd, rc, th);
1347 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1348 rc, type, PFID(tfid));
1355 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1356 int flags, struct md_object *obj)
1358 struct thandle *handle;
1359 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1360 struct mdd_device *mdd = mdo2mdd(obj);
1364 handle = mdd_trans_create(env, mdd);
1366 return(PTR_ERR(handle));
1368 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1372 rc = mdd_trans_start(env, mdd, handle);
1376 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1380 mdd_trans_stop(env, mdd, rc, handle);
1386 * Should be called with write lock held.
1388 * \see mdd_lma_set_locked().
1390 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1391 const struct md_attr *ma, struct thandle *handle)
1393 struct mdd_thread_info *info = mdd_env_info(env);
1395 struct lustre_mdt_attrs *lma =
1396 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1397 int lmasize = sizeof(struct lustre_mdt_attrs);
1402 /* Either HSM or SOM part is not valid, we need to read it before */
1403 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1404 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1408 lustre_lma_swab(lma);
1410 memset(lma, 0, lmasize);
1414 if (ma->ma_valid & MA_HSM) {
1415 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1416 lma->lma_compat |= LMAC_HSM;
1420 if (ma->ma_valid & MA_SOM) {
1421 LASSERT(ma->ma_som != NULL);
1422 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1423 lma->lma_compat &= ~LMAC_SOM;
1425 lma->lma_compat |= LMAC_SOM;
1426 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1427 lma->lma_som_size = ma->ma_som->msd_size;
1428 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1429 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1434 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1436 lustre_lma_swab(lma);
1437 buf = mdd_buf_get(env, lma, lmasize);
1438 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1444 * Save LMA extended attributes with data from \a ma.
1446 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1447 * not, LMA EA will be first read from disk, modified and write back.
1450 static int mdd_lma_set_locked(const struct lu_env *env,
1451 struct mdd_object *mdd_obj,
1452 const struct md_attr *ma, struct thandle *handle)
1456 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1457 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1458 mdd_write_unlock(env, mdd_obj);
1462 /* Precedence for choosing record type when multiple
1463 * attributes change: setattr > mtime > ctime > atime
1464 * (ctime changes when mtime does, plus chmod/chown.
1465 * atime and ctime are independent.) */
1466 static int mdd_attr_set_changelog(const struct lu_env *env,
1467 struct md_object *obj, struct thandle *handle,
1470 struct mdd_device *mdd = mdo2mdd(obj);
1473 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1474 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1475 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1476 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1477 bits = bits & mdd->mdd_cl.mc_mask;
1481 /* The record type is the lowest non-masked set bit */
1482 while (bits && ((bits & 1) == 0)) {
1487 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1488 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1489 md2mdd_obj(obj), handle);
1492 static int mdd_declare_attr_set(const struct lu_env *env,
1493 struct mdd_device *mdd,
1494 struct mdd_object *obj,
1495 const struct md_attr *ma,
1496 struct lov_mds_md *lmm,
1497 struct thandle *handle)
1499 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1502 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1506 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1510 if (ma->ma_valid & MA_LOV) {
1512 buf->lb_len = ma->ma_lmm_size;
1513 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1519 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1521 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1522 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1528 #ifdef CONFIG_FS_POSIX_ACL
1529 if (ma->ma_attr.la_valid & LA_MODE) {
1530 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1531 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1533 mdd_read_unlock(env, obj);
1534 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1542 rc = mdo_declare_xattr_set(env, obj, buf,
1543 XATTR_NAME_ACL_ACCESS, 0,
1551 /* basically the log is the same as in unlink case */
1555 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1556 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1557 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1558 mdd->mdd_obd_dev->obd_name,
1559 le32_to_cpu(lmm->lmm_magic),
1560 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1564 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1565 if (stripe == LOV_ALL_STRIPES) {
1566 struct lov_desc *ldesc;
1568 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1569 LASSERT(ldesc != NULL);
1570 stripe = ldesc->ld_tgt_count;
1573 for (i = 0; i < stripe; i++) {
1574 rc = mdd_declare_llog_record(env, mdd,
1575 sizeof(struct llog_unlink_rec),
1585 /* set attr and LOV EA at once, return updated attr */
1586 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1587 const struct md_attr *ma)
1589 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1590 struct mdd_device *mdd = mdo2mdd(obj);
1591 struct thandle *handle;
1592 struct lov_mds_md *lmm = NULL;
1593 struct llog_cookie *logcookies = NULL;
1594 int rc, lmm_size = 0, cookie_size = 0;
1595 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1596 struct obd_device *obd = mdd->mdd_obd_dev;
1597 struct mds_obd *mds = &obd->u.mds;
1598 #ifdef HAVE_QUOTA_SUPPORT
1599 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1600 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1601 int quota_opc = 0, block_count = 0;
1602 int inode_pending[MAXQUOTAS] = { 0, 0 };
1603 int block_pending[MAXQUOTAS] = { 0, 0 };
1607 *la_copy = ma->ma_attr;
1608 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1612 /* setattr on "close" only change atime, or do nothing */
1613 if (ma->ma_valid == MA_INODE &&
1614 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1617 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1618 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1619 lmm_size = mdd_lov_mdsize(env, mdd);
1620 lmm = mdd_max_lmm_get(env, mdd);
1624 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1631 handle = mdd_trans_create(env, mdd);
1633 RETURN(PTR_ERR(handle));
1635 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1636 lmm_size > 0 ? lmm : NULL, handle);
1640 /* permission changes may require sync operation */
1641 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1642 handle->th_sync = !!mdd->mdd_sync_permission;
1644 rc = mdd_trans_start(env, mdd, handle);
1648 /* permission changes may require sync operation */
1649 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1650 handle->th_sync |= mdd->mdd_sync_permission;
1652 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1653 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1654 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1656 #ifdef HAVE_QUOTA_SUPPORT
1657 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1658 struct obd_export *exp = md_quota(env)->mq_exp;
1659 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1661 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1663 quota_opc = FSFILT_OP_SETATTR;
1664 mdd_quota_wrapper(la_copy, qnids);
1665 mdd_quota_wrapper(la_tmp, qoids);
1666 /* get file quota for new owner */
1667 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1668 qnids, inode_pending, 1, NULL, 0,
1670 block_count = (la_tmp->la_blocks + 7) >> 3;
1673 mdd_data_get(env, mdd_obj, &data);
1674 /* get block quota for new owner */
1675 lquota_chkquota(mds_quota_interface_ref, obd,
1676 exp, qnids, block_pending,
1678 LQUOTA_FLAGS_BLK, data, 1);
1684 if (la_copy->la_valid & LA_FLAGS) {
1685 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1688 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1689 } else if (la_copy->la_valid) { /* setattr */
1690 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1692 /* journal chown/chgrp in llog, just like unlink */
1693 if (rc == 0 && lmm_size){
1694 cookie_size = mdd_lov_cookiesize(env, mdd);
1695 logcookies = mdd_max_cookie_get(env, mdd);
1696 if (logcookies == NULL)
1697 GOTO(cleanup, rc = -ENOMEM);
1699 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1700 logcookies, cookie_size) <= 0)
1705 if (rc == 0 && ma->ma_valid & MA_LOV) {
1708 mode = mdd_object_type(mdd_obj);
1709 if (S_ISREG(mode) || S_ISDIR(mode)) {
1710 rc = mdd_lsm_sanity_check(env, mdd_obj);
1714 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1715 ma->ma_lmm_size, handle, 1);
1719 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1722 mode = mdd_object_type(mdd_obj);
1724 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1729 rc = mdd_attr_set_changelog(env, obj, handle,
1730 ma->ma_attr.la_valid);
1732 mdd_trans_stop(env, mdd, rc, handle);
1733 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1734 /*set obd attr, if needed*/
1735 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1738 #ifdef HAVE_QUOTA_SUPPORT
1740 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1742 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1744 /* Trigger dqrel/dqacq for original owner and new owner.
1745 * If failed, the next call for lquota_chkquota will
1747 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1754 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1755 const struct lu_buf *buf, const char *name, int fl,
1756 struct thandle *handle)
1761 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1762 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1763 mdd_write_unlock(env, obj);
1768 static int mdd_xattr_sanity_check(const struct lu_env *env,
1769 struct mdd_object *obj)
1771 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1772 struct md_ucred *uc = md_ucred(env);
1776 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1779 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1783 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1784 !mdd_capable(uc, CFS_CAP_FOWNER))
1790 static int mdd_declare_xattr_set(const struct lu_env *env,
1791 struct mdd_device *mdd,
1792 struct mdd_object *obj,
1793 const struct lu_buf *buf,
1795 struct thandle *handle)
1800 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1804 /* Only record user xattr changes */
1805 if ((strncmp("user.", name, 5) == 0))
1806 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1812 * The caller should guarantee to update the object ctime
1813 * after xattr_set if needed.
1815 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1816 const struct lu_buf *buf, const char *name,
1819 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1820 struct mdd_device *mdd = mdo2mdd(obj);
1821 struct thandle *handle;
1825 rc = mdd_xattr_sanity_check(env, mdd_obj);
1829 handle = mdd_trans_create(env, mdd);
1831 RETURN(PTR_ERR(handle));
1833 /* security-replated changes may require sync */
1834 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1835 mdd->mdd_sync_permission == 1)
1836 handle->th_sync = 1;
1838 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1842 rc = mdd_trans_start(env, mdd, handle);
1846 /* security-replated changes may require sync */
1847 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1848 handle->th_sync |= mdd->mdd_sync_permission;
1850 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1852 /* Only record system & user xattr changes */
1853 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1854 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1855 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1856 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1857 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1858 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1859 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1863 mdd_trans_stop(env, mdd, rc, handle);
1868 static int mdd_declare_xattr_del(const struct lu_env *env,
1869 struct mdd_device *mdd,
1870 struct mdd_object *obj,
1872 struct thandle *handle)
1876 rc = mdo_declare_xattr_del(env, obj, name, handle);
1880 /* Only record user xattr changes */
1881 if ((strncmp("user.", name, 5) == 0))
1882 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1888 * The caller should guarantee to update the object ctime
1889 * after xattr_set if needed.
1891 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1894 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1895 struct mdd_device *mdd = mdo2mdd(obj);
1896 struct thandle *handle;
1900 rc = mdd_xattr_sanity_check(env, mdd_obj);
1904 handle = mdd_trans_create(env, mdd);
1906 RETURN(PTR_ERR(handle));
1908 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1912 rc = mdd_trans_start(env, mdd, handle);
1916 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1917 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1918 mdd_object_capa(env, mdd_obj));
1919 mdd_write_unlock(env, mdd_obj);
1921 /* Only record system & user xattr changes */
1922 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1923 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1924 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1925 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1926 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1927 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1928 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1932 mdd_trans_stop(env, mdd, rc, handle);
1937 /* partial unlink */
1938 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1941 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1942 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1943 struct mdd_device *mdd = mdo2mdd(obj);
1944 struct thandle *handle;
1945 #ifdef HAVE_QUOTA_SUPPORT
1946 struct obd_device *obd = mdd->mdd_obd_dev;
1947 struct mds_obd *mds = &obd->u.mds;
1948 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1954 /* XXX: this code won't be used ever:
1955 * DNE uses slightly different approach */
1959 * Check -ENOENT early here because we need to get object type
1960 * to calculate credits before transaction start
1962 if (mdd_object_exists(mdd_obj) == 0) {
1963 CERROR("%s: object "DFID" not found: rc = -2\n",
1964 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1968 LASSERT(mdd_object_exists(mdd_obj) > 0);
1970 handle = mdd_trans_create(env, mdd);
1974 rc = mdd_trans_start(env, mdd, handle);
1976 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1978 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1982 mdo_ref_del(env, mdd_obj, handle);
1984 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1986 mdo_ref_del(env, mdd_obj, handle);
1989 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1990 la_copy->la_ctime = ma->ma_attr.la_ctime;
1992 la_copy->la_valid = LA_CTIME;
1993 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1997 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1998 #ifdef HAVE_QUOTA_SUPPORT
1999 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2000 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2001 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2002 mdd_quota_wrapper(&ma->ma_attr, qids);
2009 mdd_write_unlock(env, mdd_obj);
2010 mdd_trans_stop(env, mdd, rc, handle);
2011 #ifdef HAVE_QUOTA_SUPPORT
2013 /* Trigger dqrel on the owner of child. If failed,
2014 * the next call for lquota_chkquota will process it */
2015 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2021 /* partial operation */
2022 static int mdd_oc_sanity_check(const struct lu_env *env,
2023 struct mdd_object *obj,
2029 switch (ma->ma_attr.la_mode & S_IFMT) {
2046 static int mdd_object_create(const struct lu_env *env,
2047 struct md_object *obj,
2048 const struct md_op_spec *spec,
2052 struct mdd_device *mdd = mdo2mdd(obj);
2053 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2054 const struct lu_fid *pfid = spec->u.sp_pfid;
2055 struct thandle *handle;
2056 #ifdef HAVE_QUOTA_SUPPORT
2057 struct obd_device *obd = mdd->mdd_obd_dev;
2058 struct obd_export *exp = md_quota(env)->mq_exp;
2059 struct mds_obd *mds = &obd->u.mds;
2060 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2061 int quota_opc = 0, block_count = 0;
2062 int inode_pending[MAXQUOTAS] = { 0, 0 };
2063 int block_pending[MAXQUOTAS] = { 0, 0 };
2068 /* XXX: this code won't be used ever:
2069 * DNE uses slightly different approach */
2072 #ifdef HAVE_QUOTA_SUPPORT
2073 if (mds->mds_quota) {
2074 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2075 mdd_quota_wrapper(&ma->ma_attr, qids);
2076 /* get file quota for child */
2077 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2078 qids, inode_pending, 1, NULL, 0,
2080 switch (ma->ma_attr.la_mode & S_IFMT) {
2089 /* get block quota for child */
2091 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2092 qids, block_pending, block_count,
2093 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2097 handle = mdd_trans_create(env, mdd);
2099 GOTO(out_pending, rc = PTR_ERR(handle));
2101 rc = mdd_trans_start(env, mdd, handle);
2103 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2104 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2108 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2112 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2113 /* If creating the slave object, set slave EA here. */
2114 int lmv_size = spec->u.sp_ea.eadatalen;
2115 struct lmv_stripe_md *lmv;
2117 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2118 LASSERT(lmv != NULL && lmv_size > 0);
2120 rc = __mdd_xattr_set(env, mdd_obj,
2121 mdd_buf_get_const(env, lmv, lmv_size),
2122 XATTR_NAME_LMV, 0, handle);
2126 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2129 #ifdef CONFIG_FS_POSIX_ACL
2130 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2131 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2133 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2134 buf->lb_len = spec->u.sp_ea.eadatalen;
2135 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2136 rc = __mdd_acl_init(env, mdd_obj, buf,
2137 &ma->ma_attr.la_mode,
2142 ma->ma_attr.la_valid |= LA_MODE;
2145 pfid = spec->u.sp_ea.fid;
2148 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2154 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2155 mdd_write_unlock(env, mdd_obj);
2157 mdd_trans_stop(env, mdd, rc, handle);
2159 #ifdef HAVE_QUOTA_SUPPORT
2161 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2163 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2165 /* Trigger dqacq on the owner of child. If failed,
2166 * the next call for lquota_chkquota will process it. */
2167 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2175 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2176 const struct md_attr *ma)
2178 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2179 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2180 struct mdd_device *mdd = mdo2mdd(obj);
2181 struct thandle *handle;
2185 /* XXX: this code won't be used ever:
2186 * DNE uses slightly different approach */
2189 handle = mdd_trans_create(env, mdd);
2193 rc = mdd_trans_start(env, mdd, handle);
2195 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2196 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2198 mdo_ref_add(env, mdd_obj, handle);
2199 mdd_write_unlock(env, mdd_obj);
2201 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2202 la_copy->la_ctime = ma->ma_attr.la_ctime;
2204 la_copy->la_valid = LA_CTIME;
2205 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2208 mdd_trans_stop(env, mdd, 0, handle);
2214 * do NOT or the MAY_*'s, you'll get the weakest
2216 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2220 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2221 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2222 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2223 * owner can write to a file even if it is marked readonly to hide
2224 * its brokenness. (bug 5781) */
2225 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2226 struct md_ucred *uc = md_ucred(env);
2228 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2229 (la->la_uid == uc->mu_fsuid))
2233 if (flags & FMODE_READ)
2235 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2237 if (flags & MDS_FMODE_EXEC)
2242 static int mdd_open_sanity_check(const struct lu_env *env,
2243 struct mdd_object *obj, int flag)
2245 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2250 if (mdd_is_dead_obj(obj))
2253 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2257 if (S_ISLNK(tmp_la->la_mode))
2260 mode = accmode(env, tmp_la, flag);
2262 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2265 if (!(flag & MDS_OPEN_CREATED)) {
2266 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2271 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2272 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2273 flag &= ~MDS_OPEN_TRUNC;
2275 /* For writing append-only file must open it with append mode. */
2276 if (mdd_is_append(obj)) {
2277 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2279 if (flag & MDS_OPEN_TRUNC)
2285 * Now, flag -- O_NOATIME does not be packed by client.
2287 if (flag & O_NOATIME) {
2288 struct md_ucred *uc = md_ucred(env);
2290 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2291 (uc->mu_valid == UCRED_NEW)) &&
2292 (uc->mu_fsuid != tmp_la->la_uid) &&
2293 !mdd_capable(uc, CFS_CAP_FOWNER))
2301 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2304 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2307 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2309 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2311 mdd_obj->mod_count++;
2313 mdd_write_unlock(env, mdd_obj);
2317 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2318 struct md_attr *ma, struct thandle *handle)
2322 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2326 return mdo_declare_destroy(env, obj, handle);
2329 /* return md_attr back,
2330 * if it is last unlink then return lov ea + llog cookie*/
2331 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2332 struct md_attr *ma, struct thandle *handle)
2337 if (S_ISREG(mdd_object_type(obj))) {
2338 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2339 * Caller must be ready for that. */
2341 rc = __mdd_lmm_get(env, obj, ma);
2342 if ((ma->ma_valid & MA_LOV))
2343 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2348 rc = mdo_destroy(env, obj, handle);
2353 static int mdd_declare_close(const struct lu_env *env,
2354 struct mdd_object *obj,
2356 struct thandle *handle)
2360 rc = orph_declare_index_delete(env, obj, handle);
2364 return mdd_declare_object_kill(env, obj, ma, handle);
2368 * No permission check is needed.
2370 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2371 struct md_attr *ma, int mode)
2373 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2374 struct mdd_device *mdd = mdo2mdd(obj);
2375 struct thandle *handle = NULL;
2377 int is_orphan = 0, reset = 1;
2379 #ifdef HAVE_QUOTA_SUPPORT
2380 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2381 struct mds_obd *mds = &obd->u.mds;
2382 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2387 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2388 mdd_obj->mod_count--;
2390 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2391 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2392 "list\n", PFID(mdd_object_fid(mdd_obj)));
2396 /* check without any lock */
2397 if (mdd_obj->mod_count == 1 &&
2398 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2400 handle = mdd_trans_create(env, mdo2mdd(obj));
2402 RETURN(PTR_ERR(handle));
2404 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2408 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2412 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2417 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2418 if (handle == NULL && mdd_obj->mod_count == 1 &&
2419 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2420 mdd_write_unlock(env, mdd_obj);
2424 /* release open count */
2425 mdd_obj->mod_count --;
2427 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2428 /* remove link to object from orphan index */
2429 LASSERT(handle != NULL);
2430 rc = __mdd_orphan_del(env, mdd_obj, handle);
2432 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2433 "list, OSS objects to be destroyed.\n",
2434 PFID(mdd_object_fid(mdd_obj)));
2437 CERROR("Object "DFID" can not be deleted from orphan "
2438 "list, maybe cause OST objects can not be "
2439 "destroyed (err: %d).\n",
2440 PFID(mdd_object_fid(mdd_obj)), rc);
2441 /* If object was not deleted from orphan list, do not
2442 * destroy OSS objects, which will be done when next
2448 rc = mdd_iattr_get(env, mdd_obj, ma);
2449 /* Object maybe not in orphan list originally, it is rare case for
2450 * mdd_finish_unlink() failure. */
2451 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2452 #ifdef HAVE_QUOTA_SUPPORT
2453 if (mds->mds_quota) {
2454 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2455 mdd_quota_wrapper(&ma->ma_attr, qids);
2458 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2459 if (ma->ma_valid & MA_FLAGS &&
2460 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2461 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2463 if (handle == NULL) {
2464 handle = mdd_trans_create(env, mdo2mdd(obj));
2466 GOTO(out, rc = PTR_ERR(handle));
2468 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2473 rc = mdd_declare_changelog_store(env, mdd,
2478 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2483 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2489 CERROR("Error when prepare to delete Object "DFID" , "
2490 "which will cause OST objects can not be "
2491 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2497 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2499 mdd_write_unlock(env, mdd_obj);
2502 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2503 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2504 if (handle == NULL) {
2505 handle = mdd_trans_create(env, mdo2mdd(obj));
2507 GOTO(stop, rc = IS_ERR(handle));
2509 rc = mdd_declare_changelog_store(env, mdd, NULL,
2514 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2519 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2525 mdd_trans_stop(env, mdd, rc, handle);
2526 #ifdef HAVE_QUOTA_SUPPORT
2528 /* Trigger dqrel on the owner of child. If failed,
2529 * the next call for lquota_chkquota will process it */
2530 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2537 * Permission check is done when open,
2538 * no need check again.
2540 static int mdd_readpage_sanity_check(const struct lu_env *env,
2541 struct mdd_object *obj)
2543 struct dt_object *next = mdd_object_child(obj);
2547 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2555 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2556 struct lu_dirpage *dp, int nob,
2557 const struct dt_it_ops *iops, struct dt_it *it,
2563 struct lu_dirent *ent;
2564 struct lu_dirent *last = NULL;
2567 memset(area, 0, sizeof (*dp));
2568 area += sizeof (*dp);
2569 nob -= sizeof (*dp);
2576 len = iops->key_size(env, it);
2578 /* IAM iterator can return record with zero len. */
2582 hash = iops->store(env, it);
2583 if (unlikely(first)) {
2585 dp->ldp_hash_start = cpu_to_le64(hash);
2588 /* calculate max space required for lu_dirent */
2589 recsize = lu_dirent_calc_size(len, attr);
2591 if (nob >= recsize) {
2592 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2593 if (result == -ESTALE)
2598 /* osd might not able to pack all attributes,
2599 * so recheck rec length */
2600 recsize = le16_to_cpu(ent->lde_reclen);
2602 result = (last != NULL) ? 0 :-EINVAL;
2606 ent = (void *)ent + recsize;
2610 result = iops->next(env, it);
2611 if (result == -ESTALE)
2613 } while (result == 0);
2616 dp->ldp_hash_end = cpu_to_le64(hash);
2618 if (last->lde_hash == dp->ldp_hash_end)
2619 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2620 last->lde_reclen = 0; /* end mark */
2625 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2626 const struct lu_rdpg *rdpg)
2629 struct dt_object *next = mdd_object_child(obj);
2630 const struct dt_it_ops *iops;
2632 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2638 LASSERT(rdpg->rp_pages != NULL);
2639 LASSERT(next->do_index_ops != NULL);
2641 if (rdpg->rp_count <= 0)
2645 * iterate through directory and fill pages from @rdpg
2647 iops = &next->do_index_ops->dio_it;
2648 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2652 rc = iops->load(env, it, rdpg->rp_hash);
2656 * Iterator didn't find record with exactly the key requested.
2658 * It is currently either
2660 * - positioned above record with key less than
2661 * requested---skip it.
2663 * - or not positioned at all (is in IAM_IT_SKEWED
2664 * state)---position it on the next item.
2666 rc = iops->next(env, it);
2671 * At this point and across for-loop:
2673 * rc == 0 -> ok, proceed.
2674 * rc > 0 -> end of directory.
2677 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2678 i++, nob -= CFS_PAGE_SIZE) {
2679 struct lu_dirpage *dp;
2681 LASSERT(i < rdpg->rp_npages);
2682 pg = rdpg->rp_pages[i];
2684 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2687 rc = mdd_dir_page_build(env, mdd, dp,
2688 min_t(int, nob, LU_PAGE_SIZE),
2689 iops, it, rdpg->rp_attrs);
2694 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2696 } else if (rc < 0) {
2697 CWARN("build page failed: %d!\n", rc);
2700 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2701 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2702 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2709 struct lu_dirpage *dp;
2711 dp = cfs_kmap(rdpg->rp_pages[0]);
2712 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2715 * No pages were processed, mark this for first page
2718 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2721 cfs_kunmap(rdpg->rp_pages[0]);
2723 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2726 iops->fini(env, it);
2731 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2732 const struct lu_rdpg *rdpg)
2734 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2738 if (mdd_object_exists(mdd_obj) == 0) {
2739 CERROR("%s: object "DFID" not found: rc = -2\n",
2740 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2744 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2745 rc = mdd_readpage_sanity_check(env, mdd_obj);
2747 GOTO(out_unlock, rc);
2749 if (mdd_is_dead_obj(mdd_obj)) {
2751 struct lu_dirpage *dp;
2754 * According to POSIX, please do not return any entry to client:
2755 * even dot and dotdot should not be returned.
2757 CWARN("readdir from dead object: "DFID"\n",
2758 PFID(mdd_object_fid(mdd_obj)));
2760 if (rdpg->rp_count <= 0)
2761 GOTO(out_unlock, rc = -EFAULT);
2762 LASSERT(rdpg->rp_pages != NULL);
2764 pg = rdpg->rp_pages[0];
2765 dp = (struct lu_dirpage*)cfs_kmap(pg);
2766 memset(dp, 0 , sizeof(struct lu_dirpage));
2767 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2768 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2769 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2771 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2774 rc = __mdd_readpage(env, mdd_obj, rdpg);
2778 mdd_read_unlock(env, mdd_obj);
2782 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2784 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2785 struct dt_object *next;
2787 if (mdd_object_exists(mdd_obj) == 0) {
2788 CERROR("%s: object "DFID" not found: rc = -2\n",
2789 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2792 next = mdd_object_child(mdd_obj);
2793 return next->do_ops->do_object_sync(env, next);
2796 const struct md_object_operations mdd_obj_ops = {
2797 .moo_permission = mdd_permission,
2798 .moo_attr_get = mdd_attr_get,
2799 .moo_attr_set = mdd_attr_set,
2800 .moo_xattr_get = mdd_xattr_get,
2801 .moo_xattr_set = mdd_xattr_set,
2802 .moo_xattr_list = mdd_xattr_list,
2803 .moo_xattr_del = mdd_xattr_del,
2804 .moo_object_create = mdd_object_create,
2805 .moo_ref_add = mdd_ref_add,
2806 .moo_ref_del = mdd_ref_del,
2807 .moo_open = mdd_open,
2808 .moo_close = mdd_close,
2809 .moo_readpage = mdd_readpage,
2810 .moo_readlink = mdd_readlink,
2811 .moo_changelog = mdd_changelog,
2812 .moo_capa_get = mdd_capa_get,
2813 .moo_object_sync = mdd_object_sync,
2814 .moo_path = mdd_path,
2815 .moo_file_lock = mdd_file_lock,
2816 .moo_file_unlock = mdd_file_unlock,