4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
58 #include "mdd_internal.h"
60 static const struct lu_object_operations mdd_lu_obj_ops;
62 static int mdd_xattr_get(const struct lu_env *env,
63 struct md_object *obj, struct lu_buf *buf,
66 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
69 if (mdd_object_exists(obj) == 0) {
70 CERROR("%s: object "DFID" not found: rc = -2\n",
71 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
74 mdo_data_get(env, obj, data);
78 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
79 struct lu_attr *la, struct lustre_capa *capa)
81 if (mdd_object_exists(obj) == 0) {
82 CERROR("%s: object "DFID" not found: rc = -2\n",
83 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
86 return mdo_attr_get(env, obj, la, capa);
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93 if (flags & LUSTRE_APPEND_FL)
94 obj->mod_flags |= APPEND_OBJ;
96 if (flags & LUSTRE_IMMUTABLE_FL)
97 obj->mod_flags |= IMMUTE_OBJ;
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 struct mdd_thread_info *info;
104 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105 LASSERT(info != NULL);
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
113 buf = &mdd_env_info(env)->mti_buf;
119 void mdd_buf_put(struct lu_buf *buf)
121 if (buf == NULL || buf->lb_buf == NULL)
123 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129 const void *area, ssize_t len)
133 buf = &mdd_env_info(env)->mti_buf;
134 buf->lb_buf = (void *)area;
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
147 if (buf->lb_buf == NULL) {
149 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150 if (buf->lb_buf == NULL)
156 /** Increase the size of the \a mti_big_buf.
157 * preserves old data in buffer
158 * old buffer remains unchanged on error
159 * \retval 0 or -ENOMEM
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
166 LASSERT(len >= oldbuf->lb_len);
167 OBD_ALLOC_LARGE(buf.lb_buf, len);
169 if (buf.lb_buf == NULL)
173 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177 memcpy(oldbuf, &buf, sizeof(buf));
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183 struct mdd_device *mdd)
185 struct mdd_thread_info *mti = mdd_env_info(env);
188 max_cookie_size = mdd_lov_cookiesize(env, mdd);
189 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190 if (mti->mti_max_cookie)
191 OBD_FREE_LARGE(mti->mti_max_cookie,
192 mti->mti_max_cookie_size);
193 mti->mti_max_cookie = NULL;
194 mti->mti_max_cookie_size = 0;
196 if (unlikely(mti->mti_max_cookie == NULL)) {
197 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198 if (likely(mti->mti_max_cookie != NULL))
199 mti->mti_max_cookie_size = max_cookie_size;
201 if (likely(mti->mti_max_cookie != NULL))
202 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203 return mti->mti_max_cookie;
206 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
208 struct mdd_thread_info *mti = mdd_env_info(env);
210 if (unlikely(mti->mti_max_lmm_size < size)) {
211 int rsize = size_roundup_power2(size);
213 if (mti->mti_max_lmm_size > 0) {
214 LASSERT(mti->mti_max_lmm);
215 OBD_FREE_LARGE(mti->mti_max_lmm,
216 mti->mti_max_lmm_size);
217 mti->mti_max_lmm = NULL;
218 mti->mti_max_lmm_size = 0;
221 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
222 if (likely(mti->mti_max_lmm != NULL))
223 mti->mti_max_lmm_size = rsize;
225 return mti->mti_max_lmm;
228 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
229 struct mdd_device *mdd)
233 max_lmm_size = mdd_lov_mdsize(env, mdd);
234 return mdd_max_lmm_buffer(env, max_lmm_size);
237 struct lu_object *mdd_object_alloc(const struct lu_env *env,
238 const struct lu_object_header *hdr,
241 struct mdd_object *mdd_obj;
243 OBD_ALLOC_PTR(mdd_obj);
244 if (mdd_obj != NULL) {
247 o = mdd2lu_obj(mdd_obj);
248 lu_object_init(o, NULL, d);
249 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
250 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
251 mdd_obj->mod_count = 0;
252 o->lo_ops = &mdd_lu_obj_ops;
259 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
260 const struct lu_object_conf *unused)
262 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
263 struct mdd_object *mdd_obj = lu2mdd_obj(o);
264 struct lu_object *below;
265 struct lu_device *under;
268 mdd_obj->mod_cltime = 0;
269 under = &d->mdd_child->dd_lu_dev;
270 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
271 mdd_pdlock_init(mdd_obj);
275 lu_object_add(o, below);
280 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
282 if (lu_object_exists(o))
283 return mdd_get_flags(env, lu2mdd_obj(o));
288 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
290 struct mdd_object *mdd = lu2mdd_obj(o);
296 static int mdd_object_print(const struct lu_env *env, void *cookie,
297 lu_printer_t p, const struct lu_object *o)
299 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
300 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
301 "valid=%x, cltime="LPU64", flags=%lx)",
302 mdd, mdd->mod_count, mdd->mod_valid,
303 mdd->mod_cltime, mdd->mod_flags);
306 static const struct lu_object_operations mdd_lu_obj_ops = {
307 .loo_object_init = mdd_object_init,
308 .loo_object_start = mdd_object_start,
309 .loo_object_free = mdd_object_free,
310 .loo_object_print = mdd_object_print,
313 struct mdd_object *mdd_object_find(const struct lu_env *env,
314 struct mdd_device *d,
315 const struct lu_fid *f)
317 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
320 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
321 const char *path, struct lu_fid *fid)
324 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
325 struct mdd_object *obj;
326 struct lu_name *lname = &mdd_env_info(env)->mti_name;
331 /* temp buffer for path element */
332 buf = mdd_buf_alloc(env, PATH_MAX);
333 if (buf->lb_buf == NULL)
336 lname->ln_name = name = buf->lb_buf;
337 lname->ln_namelen = 0;
338 *f = mdd->mdd_root_fid;
345 while (*path != '/' && *path != '\0') {
353 /* find obj corresponding to fid */
354 obj = mdd_object_find(env, mdd, f);
356 GOTO(out, rc = -EREMOTE);
358 GOTO(out, rc = PTR_ERR(obj));
359 /* get child fid from parent and name */
360 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
361 mdd_object_put(env, obj);
366 lname->ln_namelen = 0;
375 /** The maximum depth that fid2path() will search.
376 * This is limited only because we want to store the fids for
377 * historical path lookup purposes.
379 #define MAX_PATH_DEPTH 100
381 /** mdd_path() lookup structure. */
382 struct path_lookup_info {
383 __u64 pli_recno; /**< history point */
384 __u64 pli_currec; /**< current record */
385 struct lu_fid pli_fid;
386 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
387 struct mdd_object *pli_mdd_obj;
388 char *pli_path; /**< full path */
390 int pli_linkno; /**< which hardlink to follow */
391 int pli_fidcount; /**< number of \a pli_fids */
394 static int mdd_path_current(const struct lu_env *env,
395 struct path_lookup_info *pli)
397 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
398 struct mdd_object *mdd_obj;
399 struct lu_buf *buf = NULL;
400 struct link_ea_header *leh;
401 struct link_ea_entry *lee;
402 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
403 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
409 ptr = pli->pli_path + pli->pli_pathlen - 1;
412 pli->pli_fidcount = 0;
413 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
415 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
416 mdd_obj = mdd_object_find(env, mdd,
417 &pli->pli_fids[pli->pli_fidcount]);
419 GOTO(out, rc = -EREMOTE);
421 GOTO(out, rc = PTR_ERR(mdd_obj));
422 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
424 mdd_object_put(env, mdd_obj);
428 /* Do I need to error out here? */
433 /* Get parent fid and object name */
434 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
435 buf = mdd_links_get(env, mdd_obj);
436 mdd_read_unlock(env, mdd_obj);
437 mdd_object_put(env, mdd_obj);
439 GOTO(out, rc = PTR_ERR(buf));
442 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
443 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445 /* If set, use link #linkno for path lookup, otherwise use
446 link #0. Only do this for the final path element. */
447 if ((pli->pli_fidcount == 0) &&
448 (pli->pli_linkno < leh->leh_reccount)) {
450 for (count = 0; count < pli->pli_linkno; count++) {
451 lee = (struct link_ea_entry *)
452 ((char *)lee + reclen);
453 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
455 if (pli->pli_linkno < leh->leh_reccount - 1)
456 /* indicate to user there are more links */
460 /* Pack the name in the end of the buffer */
461 ptr -= tmpname->ln_namelen;
462 if (ptr - 1 <= pli->pli_path)
463 GOTO(out, rc = -EOVERFLOW);
464 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
467 /* Store the parent fid for historic lookup */
468 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
469 GOTO(out, rc = -EOVERFLOW);
470 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
473 /* Verify that our path hasn't changed since we started the lookup.
474 Record the current index, and verify the path resolves to the
475 same fid. If it does, then the path is correct as of this index. */
476 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
477 pli->pli_currec = mdd->mdd_cl.mc_index;
478 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
479 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
481 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
482 GOTO (out, rc = -EAGAIN);
484 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
485 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
486 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
487 PFID(&pli->pli_fid));
488 GOTO(out, rc = -EAGAIN);
490 ptr++; /* skip leading / */
491 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
495 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
496 /* if we vmalloced a large buffer drop it */
502 static int mdd_path_historic(const struct lu_env *env,
503 struct path_lookup_info *pli)
508 /* Returns the full path to this fid, as of changelog record recno. */
509 static int mdd_path(const struct lu_env *env, struct md_object *obj,
510 char *path, int pathlen, __u64 *recno, int *linkno)
512 struct path_lookup_info *pli;
520 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
529 pli->pli_mdd_obj = md2mdd_obj(obj);
530 pli->pli_recno = *recno;
531 pli->pli_path = path;
532 pli->pli_pathlen = pathlen;
533 pli->pli_linkno = *linkno;
535 /* Retry multiple times in case file is being moved */
536 while (tries-- && rc == -EAGAIN)
537 rc = mdd_path_current(env, pli);
539 /* For historical path lookup, the current links may not have existed
540 * at "recno" time. We must switch over to earlier links/parents
541 * by using the changelog records. If the earlier parent doesn't
542 * exist, we must search back through the changelog to reconstruct
543 * its parents, then check if it exists, etc.
544 * We may ignore this problem for the initial implementation and
545 * state that an "original" hardlink must still exist for us to find
546 * historic path name. */
547 if (pli->pli_recno != -1) {
548 rc = mdd_path_historic(env, pli);
550 *recno = pli->pli_currec;
551 /* Return next link index to caller */
552 *linkno = pli->pli_linkno;
560 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
562 struct lu_attr *la = &mdd_env_info(env)->mti_la;
566 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
568 mdd_flags_xlate(obj, la->la_flags);
573 /* get only inode attributes */
574 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
580 if (ma->ma_valid & MA_INODE)
583 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
584 mdd_object_capa(env, mdd_obj));
586 ma->ma_valid |= MA_INODE;
590 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 struct lov_desc *ldesc;
593 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
594 struct lov_user_md *lum = (struct lov_user_md*)lmm;
600 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
601 LASSERT(ldesc != NULL);
603 lum->lmm_magic = LOV_MAGIC_V1;
604 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
605 lum->lmm_pattern = ldesc->ld_pattern;
606 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
607 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
608 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610 RETURN(sizeof(*lum));
613 static int is_rootdir(struct mdd_object *mdd_obj)
615 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
616 const struct lu_fid *fid = mdo2fid(mdd_obj);
618 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
621 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
624 struct mdd_thread_info *info = mdd_env_info(env);
629 LASSERT(info != NULL);
630 LASSERT(ma->ma_lmm_size > 0);
631 LASSERT(ma->ma_big_lmm_used == 0);
633 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
634 mdd_object_capa(env, obj));
638 /* big_lmm may need to grow */
640 mdd_max_lmm_buffer(env, size);
641 if (info->mti_max_lmm == NULL)
644 LASSERT(info->mti_max_lmm_size >= size);
645 rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
650 ma->ma_big_lmm_used = 1;
651 ma->ma_valid |= MA_LOV;
652 ma->ma_lmm = info->mti_max_lmm;
653 ma->ma_lmm_size = size;
658 /* get lov EA only */
659 static int __mdd_lmm_get(const struct lu_env *env,
660 struct mdd_object *mdd_obj, struct md_attr *ma)
665 if (ma->ma_valid & MA_LOV)
668 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
671 rc = mdd_big_lmm_get(env, mdd_obj, ma);
672 else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
673 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
676 ma->ma_lmm_size = rc;
677 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
678 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
684 /* get the first parent fid from link EA */
685 static int mdd_pfid_get(const struct lu_env *env,
686 struct mdd_object *mdd_obj, struct md_attr *ma)
689 struct link_ea_header *leh;
690 struct link_ea_entry *lee;
691 struct lu_fid *pfid = &ma->ma_pfid;
694 if (ma->ma_valid & MA_PFID)
697 buf = mdd_links_get(env, mdd_obj);
699 RETURN(PTR_ERR(buf));
702 lee = (struct link_ea_entry *)(leh + 1);
703 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
704 fid_be_to_cpu(pfid, pfid);
705 ma->ma_valid |= MA_PFID;
706 if (buf->lb_len > OBD_ALLOC_BIG)
707 /* if we vmalloced a large buffer drop it */
712 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
718 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
719 rc = __mdd_lmm_get(env, mdd_obj, ma);
720 mdd_read_unlock(env, mdd_obj);
725 static int __mdd_lmv_get(const struct lu_env *env,
726 struct mdd_object *mdd_obj, struct md_attr *ma)
731 if (ma->ma_valid & MA_LMV)
734 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
737 ma->ma_valid |= MA_LMV;
743 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
746 struct mdd_thread_info *info = mdd_env_info(env);
747 struct lustre_mdt_attrs *lma =
748 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
753 /* If all needed data are already valid, nothing to do */
754 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
755 (ma->ma_need & (MA_HSM | MA_SOM)))
758 /* Read LMA from disk EA */
759 lma_size = sizeof(info->mti_xattr_buf);
760 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
764 /* Useless to check LMA incompatibility because this is already done in
765 * osd_ea_fid_get(), and this will fail long before this code is
767 * So, if we are here, LMA is compatible.
770 lustre_lma_swab(lma);
772 /* Swab and copy LMA */
773 if (ma->ma_need & MA_HSM) {
774 if (lma->lma_compat & LMAC_HSM)
775 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
777 ma->ma_hsm.mh_flags = 0;
778 ma->ma_valid |= MA_HSM;
782 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
783 LASSERT(ma->ma_som != NULL);
784 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
785 ma->ma_som->msd_size = lma->lma_som_size;
786 ma->ma_som->msd_blocks = lma->lma_som_blocks;
787 ma->ma_som->msd_mountid = lma->lma_som_mountid;
788 ma->ma_valid |= MA_SOM;
794 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
800 if (ma->ma_need & MA_INODE)
801 rc = mdd_iattr_get(env, mdd_obj, ma);
803 if (rc == 0 && ma->ma_need & MA_LOV) {
804 if (S_ISREG(mdd_object_type(mdd_obj)) ||
805 S_ISDIR(mdd_object_type(mdd_obj)))
806 rc = __mdd_lmm_get(env, mdd_obj, ma);
808 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
809 if (S_ISREG(mdd_object_type(mdd_obj)))
810 rc = mdd_pfid_get(env, mdd_obj, ma);
812 if (rc == 0 && ma->ma_need & MA_LMV) {
813 if (S_ISDIR(mdd_object_type(mdd_obj)))
814 rc = __mdd_lmv_get(env, mdd_obj, ma);
816 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
817 if (S_ISREG(mdd_object_type(mdd_obj)))
818 rc = __mdd_lma_get(env, mdd_obj, ma);
820 #ifdef CONFIG_FS_POSIX_ACL
821 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
822 if (S_ISDIR(mdd_object_type(mdd_obj)))
823 rc = mdd_def_acl_get(env, mdd_obj, ma);
826 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
827 rc, ma->ma_valid, ma->ma_lmm);
831 int mdd_attr_get_internal_locked(const struct lu_env *env,
832 struct mdd_object *mdd_obj, struct md_attr *ma)
835 int needlock = ma->ma_need &
836 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
839 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
840 rc = mdd_attr_get_internal(env, mdd_obj, ma);
842 mdd_read_unlock(env, mdd_obj);
847 * No permission check is needed.
849 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
852 struct mdd_object *mdd_obj = md2mdd_obj(obj);
856 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
861 * No permission check is needed.
863 static int mdd_xattr_get(const struct lu_env *env,
864 struct md_object *obj, struct lu_buf *buf,
867 struct mdd_object *mdd_obj = md2mdd_obj(obj);
872 if (mdd_object_exists(mdd_obj) == 0) {
873 CERROR("%s: object "DFID" not found: rc = -2\n",
874 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
878 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
879 rc = mdo_xattr_get(env, mdd_obj, buf, name,
880 mdd_object_capa(env, mdd_obj));
881 mdd_read_unlock(env, mdd_obj);
887 * Permission check is done when open,
888 * no need check again.
890 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
893 struct mdd_object *mdd_obj = md2mdd_obj(obj);
894 struct dt_object *next;
899 if (mdd_object_exists(mdd_obj) == 0) {
900 CERROR("%s: object "DFID" not found: rc = -2\n",
901 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
905 next = mdd_object_child(mdd_obj);
906 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
907 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
908 mdd_object_capa(env, mdd_obj));
909 mdd_read_unlock(env, mdd_obj);
914 * No permission check is needed.
916 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
919 struct mdd_object *mdd_obj = md2mdd_obj(obj);
924 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
925 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
926 mdd_read_unlock(env, mdd_obj);
931 int mdd_declare_object_create_internal(const struct lu_env *env,
932 struct mdd_object *p,
933 struct mdd_object *c,
935 struct thandle *handle,
936 const struct md_op_spec *spec)
938 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
939 const struct dt_index_features *feat = spec->sp_feat;
943 if (feat != &dt_directory_features && feat != NULL)
944 dof->dof_type = DFT_INDEX;
946 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
948 dof->u.dof_idx.di_feat = feat;
950 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
955 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
956 struct mdd_object *c, struct md_attr *ma,
957 struct thandle *handle,
958 const struct md_op_spec *spec)
960 struct lu_attr *attr = &ma->ma_attr;
961 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
962 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
963 const struct dt_index_features *feat = spec->sp_feat;
967 if (!mdd_object_exists(c)) {
968 struct dt_object *next = mdd_object_child(c);
971 if (feat != &dt_directory_features && feat != NULL)
972 dof->dof_type = DFT_INDEX;
974 dof->dof_type = dt_mode_to_dft(attr->la_mode);
976 dof->u.dof_idx.di_feat = feat;
978 /* @hint will be initialized by underlying device. */
979 next->do_ops->do_ah_init(env, hint,
980 p ? mdd_object_child(p) : NULL,
981 attr->la_mode & S_IFMT);
983 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
984 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
992 * Make sure the ctime is increased only.
994 static inline int mdd_attr_check(const struct lu_env *env,
995 struct mdd_object *obj,
996 struct lu_attr *attr)
998 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1002 if (attr->la_valid & LA_CTIME) {
1003 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1007 if (attr->la_ctime < tmp_la->la_ctime)
1008 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1009 else if (attr->la_valid == LA_CTIME &&
1010 attr->la_ctime == tmp_la->la_ctime)
1011 attr->la_valid &= ~LA_CTIME;
1016 int mdd_attr_set_internal(const struct lu_env *env,
1017 struct mdd_object *obj,
1018 struct lu_attr *attr,
1019 struct thandle *handle,
1025 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1026 #ifdef CONFIG_FS_POSIX_ACL
1027 if (!rc && (attr->la_valid & LA_MODE) && needacl)
1028 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1033 int mdd_attr_check_set_internal(const struct lu_env *env,
1034 struct mdd_object *obj,
1035 struct lu_attr *attr,
1036 struct thandle *handle,
1042 rc = mdd_attr_check(env, obj, attr);
1047 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1051 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1052 struct mdd_object *obj,
1053 struct lu_attr *attr,
1054 struct thandle *handle,
1060 needacl = needacl && (attr->la_valid & LA_MODE);
1062 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1063 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1065 mdd_write_unlock(env, obj);
1069 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1070 struct mdd_object *obj,
1071 struct lu_attr *attr,
1072 struct thandle *handle,
1078 needacl = needacl && (attr->la_valid & LA_MODE);
1080 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1081 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1083 mdd_write_unlock(env, obj);
1087 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1088 const struct lu_buf *buf, const char *name,
1089 int fl, struct thandle *handle)
1091 struct lustre_capa *capa = mdd_object_capa(env, obj);
1095 if (buf->lb_buf && buf->lb_len > 0)
1096 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1097 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1098 rc = mdo_xattr_del(env, obj, name, handle, capa);
1104 * This gives the same functionality as the code between
1105 * sys_chmod and inode_setattr
1106 * chown_common and inode_setattr
1107 * utimes and inode_setattr
1108 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1110 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1111 struct lu_attr *la, const struct md_attr *ma)
1113 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1114 struct md_ucred *uc;
1121 /* Do not permit change file type */
1122 if (la->la_valid & LA_TYPE)
1125 /* They should not be processed by setattr */
1126 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1129 /* export destroy does not have ->le_ses, but we may want
1130 * to drop LUSTRE_SOM_FL. */
1136 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1140 if (la->la_valid == LA_CTIME) {
1141 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1142 /* This is only for set ctime when rename's source is
1144 rc = mdd_may_delete(env, NULL, obj,
1145 (struct md_attr *)ma, 1, 0);
1146 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1147 la->la_valid &= ~LA_CTIME;
1151 if (la->la_valid == LA_ATIME) {
1152 /* This is atime only set for read atime update on close. */
1153 if (la->la_atime >= tmp_la->la_atime &&
1154 la->la_atime < (tmp_la->la_atime +
1155 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1156 la->la_valid &= ~LA_ATIME;
1160 /* Check if flags change. */
1161 if (la->la_valid & LA_FLAGS) {
1162 unsigned int oldflags = 0;
1163 unsigned int newflags = la->la_flags &
1164 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1166 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1167 !mdd_capable(uc, CFS_CAP_FOWNER))
1170 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1171 * only be changed by the relevant capability. */
1172 if (mdd_is_immutable(obj))
1173 oldflags |= LUSTRE_IMMUTABLE_FL;
1174 if (mdd_is_append(obj))
1175 oldflags |= LUSTRE_APPEND_FL;
1176 if ((oldflags ^ newflags) &&
1177 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1180 if (!S_ISDIR(tmp_la->la_mode))
1181 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1184 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1185 (la->la_valid & ~LA_FLAGS) &&
1186 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1189 /* Check for setting the obj time. */
1190 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1191 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1192 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1193 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1194 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1202 if (la->la_valid & LA_KILL_SUID) {
1203 la->la_valid &= ~LA_KILL_SUID;
1204 if ((tmp_la->la_mode & S_ISUID) &&
1205 !(la->la_valid & LA_MODE)) {
1206 la->la_mode = tmp_la->la_mode;
1207 la->la_valid |= LA_MODE;
1209 la->la_mode &= ~S_ISUID;
1212 if (la->la_valid & LA_KILL_SGID) {
1213 la->la_valid &= ~LA_KILL_SGID;
1214 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1215 (S_ISGID | S_IXGRP)) &&
1216 !(la->la_valid & LA_MODE)) {
1217 la->la_mode = tmp_la->la_mode;
1218 la->la_valid |= LA_MODE;
1220 la->la_mode &= ~S_ISGID;
1223 /* Make sure a caller can chmod. */
1224 if (la->la_valid & LA_MODE) {
1225 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1226 (uc->mu_fsuid != tmp_la->la_uid) &&
1227 !mdd_capable(uc, CFS_CAP_FOWNER))
1230 if (la->la_mode == (cfs_umode_t) -1)
1231 la->la_mode = tmp_la->la_mode;
1233 la->la_mode = (la->la_mode & S_IALLUGO) |
1234 (tmp_la->la_mode & ~S_IALLUGO);
1236 /* Also check the setgid bit! */
1237 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1238 la->la_gid : tmp_la->la_gid) &&
1239 !mdd_capable(uc, CFS_CAP_FSETID))
1240 la->la_mode &= ~S_ISGID;
1242 la->la_mode = tmp_la->la_mode;
1245 /* Make sure a caller can chown. */
1246 if (la->la_valid & LA_UID) {
1247 if (la->la_uid == (uid_t) -1)
1248 la->la_uid = tmp_la->la_uid;
1249 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1250 (la->la_uid != tmp_la->la_uid)) &&
1251 !mdd_capable(uc, CFS_CAP_CHOWN))
1254 /* If the user or group of a non-directory has been
1255 * changed by a non-root user, remove the setuid bit.
1256 * 19981026 David C Niemi <niemi@tux.org>
1258 * Changed this to apply to all users, including root,
1259 * to avoid some races. This is the behavior we had in
1260 * 2.0. The check for non-root was definitely wrong
1261 * for 2.2 anyway, as it should have been using
1262 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1263 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1264 !S_ISDIR(tmp_la->la_mode)) {
1265 la->la_mode &= ~S_ISUID;
1266 la->la_valid |= LA_MODE;
1270 /* Make sure caller can chgrp. */
1271 if (la->la_valid & LA_GID) {
1272 if (la->la_gid == (gid_t) -1)
1273 la->la_gid = tmp_la->la_gid;
1274 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1275 ((la->la_gid != tmp_la->la_gid) &&
1276 !lustre_in_group_p(uc, la->la_gid))) &&
1277 !mdd_capable(uc, CFS_CAP_CHOWN))
1280 /* Likewise, if the user or group of a non-directory
1281 * has been changed by a non-root user, remove the
1282 * setgid bit UNLESS there is no group execute bit
1283 * (this would be a file marked for mandatory
1284 * locking). 19981026 David C Niemi <niemi@tux.org>
1286 * Removed the fsuid check (see the comment above) --
1288 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1289 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1290 la->la_mode &= ~S_ISGID;
1291 la->la_valid |= LA_MODE;
1295 /* For both Size-on-MDS case and truncate case,
1296 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1297 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1298 * For SOM case, it is true, the MAY_WRITE perm has been checked
1299 * when open, no need check again. For truncate case, it is false,
1300 * the MAY_WRITE perm should be checked here. */
1301 if (ma->ma_attr_flags & MDS_SOM) {
1302 /* For the "Size-on-MDS" setattr update, merge coming
1303 * attributes with the set in the inode. BUG 10641 */
1304 if ((la->la_valid & LA_ATIME) &&
1305 (la->la_atime <= tmp_la->la_atime))
1306 la->la_valid &= ~LA_ATIME;
1308 /* OST attributes do not have a priority over MDS attributes,
1309 * so drop times if ctime is equal. */
1310 if ((la->la_valid & LA_CTIME) &&
1311 (la->la_ctime <= tmp_la->la_ctime))
1312 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1314 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1315 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1316 (uc->mu_fsuid == tmp_la->la_uid)) &&
1317 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1318 rc = mdd_permission_internal_locked(env, obj,
1325 if (la->la_valid & LA_CTIME) {
1326 /* The pure setattr, it has the priority over what is
1327 * already set, do not drop it if ctime is equal. */
1328 if (la->la_ctime < tmp_la->la_ctime)
1329 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1337 /** Store a data change changelog record
1338 * If this fails, we must fail the whole transaction; we don't
1339 * want the change to commit without the log entry.
1340 * \param mdd_obj - mdd_object of change
1341 * \param handle - transacion handle
1343 static int mdd_changelog_data_store(const struct lu_env *env,
1344 struct mdd_device *mdd,
1345 enum changelog_rec_type type,
1347 struct mdd_object *mdd_obj,
1348 struct thandle *handle)
1350 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1351 struct llog_changelog_rec *rec;
1352 struct thandle *th = NULL;
1358 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1360 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1363 LASSERT(mdd_obj != NULL);
1364 LASSERT(handle != NULL);
1366 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1367 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1368 /* Don't need multiple updates in this log */
1369 /* Don't check under lock - no big deal if we get an extra
1374 reclen = llog_data_len(sizeof(*rec));
1375 buf = mdd_buf_alloc(env, reclen);
1376 if (buf->lb_buf == NULL)
1378 rec = (struct llog_changelog_rec *)buf->lb_buf;
1380 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1381 rec->cr.cr_type = (__u32)type;
1382 rec->cr.cr_tfid = *tfid;
1383 rec->cr.cr_namelen = 0;
1384 mdd_obj->mod_cltime = cfs_time_current_64();
1386 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1389 mdd_trans_stop(env, mdd, rc, th);
1392 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1393 rc, type, PFID(tfid));
1400 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1401 int flags, struct md_object *obj)
1403 struct thandle *handle;
1404 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1405 struct mdd_device *mdd = mdo2mdd(obj);
1409 handle = mdd_trans_create(env, mdd);
1411 return(PTR_ERR(handle));
1413 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1417 rc = mdd_trans_start(env, mdd, handle);
1421 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1425 mdd_trans_stop(env, mdd, rc, handle);
1431 * Should be called with write lock held.
1433 * \see mdd_lma_set_locked().
1435 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1436 const struct md_attr *ma, struct thandle *handle)
1438 struct mdd_thread_info *info = mdd_env_info(env);
1440 struct lustre_mdt_attrs *lma =
1441 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1442 int lmasize = sizeof(struct lustre_mdt_attrs);
1447 /* Either HSM or SOM part is not valid, we need to read it before */
1448 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1449 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1453 lustre_lma_swab(lma);
1455 memset(lma, 0, lmasize);
1459 if (ma->ma_valid & MA_HSM) {
1460 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1461 lma->lma_compat |= LMAC_HSM;
1465 if (ma->ma_valid & MA_SOM) {
1466 LASSERT(ma->ma_som != NULL);
1467 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1468 lma->lma_compat &= ~LMAC_SOM;
1470 lma->lma_compat |= LMAC_SOM;
1471 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1472 lma->lma_som_size = ma->ma_som->msd_size;
1473 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1474 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1479 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1481 lustre_lma_swab(lma);
1482 buf = mdd_buf_get(env, lma, lmasize);
1483 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1489 * Save LMA extended attributes with data from \a ma.
1491 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1492 * not, LMA EA will be first read from disk, modified and write back.
1495 static int mdd_lma_set_locked(const struct lu_env *env,
1496 struct mdd_object *mdd_obj,
1497 const struct md_attr *ma, struct thandle *handle)
1501 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1502 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1503 mdd_write_unlock(env, mdd_obj);
1507 /* Precedence for choosing record type when multiple
1508 * attributes change: setattr > mtime > ctime > atime
1509 * (ctime changes when mtime does, plus chmod/chown.
1510 * atime and ctime are independent.) */
1511 static int mdd_attr_set_changelog(const struct lu_env *env,
1512 struct md_object *obj, struct thandle *handle,
1515 struct mdd_device *mdd = mdo2mdd(obj);
1518 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1519 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1520 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1521 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1522 bits = bits & mdd->mdd_cl.mc_mask;
1526 /* The record type is the lowest non-masked set bit */
1527 while (bits && ((bits & 1) == 0)) {
1532 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1533 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1534 md2mdd_obj(obj), handle);
1537 static int mdd_declare_attr_set(const struct lu_env *env,
1538 struct mdd_device *mdd,
1539 struct mdd_object *obj,
1540 const struct md_attr *ma,
1541 struct lov_mds_md *lmm,
1542 struct thandle *handle)
1544 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1547 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1551 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1555 if (ma->ma_valid & MA_LOV) {
1557 buf->lb_len = ma->ma_lmm_size;
1558 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1564 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1566 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1567 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1573 #ifdef CONFIG_FS_POSIX_ACL
1574 if (ma->ma_attr.la_valid & LA_MODE) {
1575 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1576 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1578 mdd_read_unlock(env, obj);
1579 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1587 rc = mdo_declare_xattr_set(env, obj, buf,
1588 XATTR_NAME_ACL_ACCESS, 0,
1596 /* basically the log is the same as in unlink case */
1600 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1601 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1602 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1603 mdd->mdd_obd_dev->obd_name,
1604 le32_to_cpu(lmm->lmm_magic),
1605 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1609 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1610 if (stripe == LOV_ALL_STRIPES) {
1611 struct lov_desc *ldesc;
1613 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1614 LASSERT(ldesc != NULL);
1615 stripe = ldesc->ld_tgt_count;
1618 for (i = 0; i < stripe; i++) {
1619 rc = mdd_declare_llog_record(env, mdd,
1620 sizeof(struct llog_unlink_rec),
1630 /* set attr and LOV EA at once, return updated attr */
1631 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1632 const struct md_attr *ma)
1634 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1635 struct mdd_device *mdd = mdo2mdd(obj);
1636 struct thandle *handle;
1637 struct lov_mds_md *lmm = NULL;
1638 struct llog_cookie *logcookies = NULL;
1639 int rc, lmm_size = 0, cookie_size = 0;
1640 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1641 #ifdef HAVE_QUOTA_SUPPORT
1642 struct obd_device *obd = mdd->mdd_obd_dev;
1643 struct mds_obd *mds = &obd->u.mds;
1644 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1645 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1646 int quota_opc = 0, block_count = 0;
1647 int inode_pending[MAXQUOTAS] = { 0, 0 };
1648 int block_pending[MAXQUOTAS] = { 0, 0 };
1652 *la_copy = ma->ma_attr;
1653 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1657 /* setattr on "close" only change atime, or do nothing */
1658 if (ma->ma_valid == MA_INODE &&
1659 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1662 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1663 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1664 lmm_size = mdd_lov_mdsize(env, mdd);
1665 lmm = mdd_max_lmm_get(env, mdd);
1669 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1676 handle = mdd_trans_create(env, mdd);
1678 RETURN(PTR_ERR(handle));
1680 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1681 lmm_size > 0 ? lmm : NULL, handle);
1685 rc = mdd_trans_start(env, mdd, handle);
1689 /* permission changes may require sync operation */
1690 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1691 handle->th_sync |= !!mdd->mdd_sync_permission;
1693 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1694 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1695 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1697 #ifdef HAVE_QUOTA_SUPPORT
1698 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1699 struct obd_export *exp = md_quota(env)->mq_exp;
1700 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1702 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1704 quota_opc = FSFILT_OP_SETATTR;
1705 mdd_quota_wrapper(la_copy, qnids);
1706 mdd_quota_wrapper(la_tmp, qoids);
1707 /* get file quota for new owner */
1708 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1709 qnids, inode_pending, 1, NULL, 0,
1711 block_count = (la_tmp->la_blocks + 7) >> 3;
1714 mdd_data_get(env, mdd_obj, &data);
1715 /* get block quota for new owner */
1716 lquota_chkquota(mds_quota_interface_ref, obd,
1717 exp, qnids, block_pending,
1719 LQUOTA_FLAGS_BLK, data, 1);
1725 if (la_copy->la_valid & LA_FLAGS) {
1726 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1729 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1730 } else if (la_copy->la_valid) { /* setattr */
1731 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1733 /* journal chown/chgrp in llog, just like unlink */
1734 if (rc == 0 && lmm_size){
1735 cookie_size = mdd_lov_cookiesize(env, mdd);
1736 logcookies = mdd_max_cookie_get(env, mdd);
1737 if (logcookies == NULL)
1738 GOTO(cleanup, rc = -ENOMEM);
1740 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1741 logcookies, cookie_size) <= 0)
1746 if (rc == 0 && ma->ma_valid & MA_LOV) {
1749 mode = mdd_object_type(mdd_obj);
1750 if (S_ISREG(mode) || S_ISDIR(mode)) {
1751 rc = mdd_lsm_sanity_check(env, mdd_obj);
1755 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1756 ma->ma_lmm_size, handle, 1);
1760 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1763 mode = mdd_object_type(mdd_obj);
1765 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1770 rc = mdd_attr_set_changelog(env, obj, handle,
1771 ma->ma_attr.la_valid);
1773 mdd_trans_stop(env, mdd, rc, handle);
1774 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1775 /*set obd attr, if needed*/
1776 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1779 #ifdef HAVE_QUOTA_SUPPORT
1781 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1783 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1785 /* Trigger dqrel/dqacq for original owner and new owner.
1786 * If failed, the next call for lquota_chkquota will
1788 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1795 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1796 const struct lu_buf *buf, const char *name, int fl,
1797 struct thandle *handle)
1802 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1803 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1804 mdd_write_unlock(env, obj);
1809 static int mdd_xattr_sanity_check(const struct lu_env *env,
1810 struct mdd_object *obj)
1812 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1813 struct md_ucred *uc = md_ucred(env);
1817 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1820 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1824 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1825 !mdd_capable(uc, CFS_CAP_FOWNER))
1831 static int mdd_declare_xattr_set(const struct lu_env *env,
1832 struct mdd_device *mdd,
1833 struct mdd_object *obj,
1834 const struct lu_buf *buf,
1836 struct thandle *handle)
1841 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1845 /* Only record user xattr changes */
1846 if ((strncmp("user.", name, 5) == 0))
1847 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1853 * The caller should guarantee to update the object ctime
1854 * after xattr_set if needed.
1856 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1857 const struct lu_buf *buf, const char *name,
1860 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1861 struct mdd_device *mdd = mdo2mdd(obj);
1862 struct thandle *handle;
1866 rc = mdd_xattr_sanity_check(env, mdd_obj);
1870 handle = mdd_trans_create(env, mdd);
1872 RETURN(PTR_ERR(handle));
1874 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1878 rc = mdd_trans_start(env, mdd, handle);
1882 /* security-replated changes may require sync */
1883 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1884 handle->th_sync |= !!mdd->mdd_sync_permission;
1886 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1888 /* Only record system & user xattr changes */
1889 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1890 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1891 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1892 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1893 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1894 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1895 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1899 mdd_trans_stop(env, mdd, rc, handle);
1904 static int mdd_declare_xattr_del(const struct lu_env *env,
1905 struct mdd_device *mdd,
1906 struct mdd_object *obj,
1908 struct thandle *handle)
1912 rc = mdo_declare_xattr_del(env, obj, name, handle);
1916 /* Only record user xattr changes */
1917 if ((strncmp("user.", name, 5) == 0))
1918 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1924 * The caller should guarantee to update the object ctime
1925 * after xattr_set if needed.
1927 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1930 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1931 struct mdd_device *mdd = mdo2mdd(obj);
1932 struct thandle *handle;
1936 rc = mdd_xattr_sanity_check(env, mdd_obj);
1940 handle = mdd_trans_create(env, mdd);
1942 RETURN(PTR_ERR(handle));
1944 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1948 rc = mdd_trans_start(env, mdd, handle);
1952 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1953 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1954 mdd_object_capa(env, mdd_obj));
1955 mdd_write_unlock(env, mdd_obj);
1957 /* Only record system & user xattr changes */
1958 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1959 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1960 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1961 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1962 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1963 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1964 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1968 mdd_trans_stop(env, mdd, rc, handle);
1973 /* partial unlink */
1974 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1977 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1978 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1979 struct mdd_device *mdd = mdo2mdd(obj);
1980 struct thandle *handle;
1981 #ifdef HAVE_QUOTA_SUPPORT
1982 struct obd_device *obd = mdd->mdd_obd_dev;
1983 struct mds_obd *mds = &obd->u.mds;
1984 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1990 /* XXX: this code won't be used ever:
1991 * DNE uses slightly different approach */
1995 * Check -ENOENT early here because we need to get object type
1996 * to calculate credits before transaction start
1998 if (mdd_object_exists(mdd_obj) == 0) {
1999 CERROR("%s: object "DFID" not found: rc = -2\n",
2000 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2004 LASSERT(mdd_object_exists(mdd_obj) > 0);
2006 handle = mdd_trans_create(env, mdd);
2010 rc = mdd_trans_start(env, mdd, handle);
2012 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2014 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2018 mdo_ref_del(env, mdd_obj, handle);
2020 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2022 mdo_ref_del(env, mdd_obj, handle);
2025 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2026 la_copy->la_ctime = ma->ma_attr.la_ctime;
2028 la_copy->la_valid = LA_CTIME;
2029 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2033 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2034 #ifdef HAVE_QUOTA_SUPPORT
2035 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2036 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2037 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2038 mdd_quota_wrapper(&ma->ma_attr, qids);
2045 mdd_write_unlock(env, mdd_obj);
2046 mdd_trans_stop(env, mdd, rc, handle);
2047 #ifdef HAVE_QUOTA_SUPPORT
2049 /* Trigger dqrel on the owner of child. If failed,
2050 * the next call for lquota_chkquota will process it */
2051 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2057 /* partial operation */
2058 static int mdd_oc_sanity_check(const struct lu_env *env,
2059 struct mdd_object *obj,
2065 switch (ma->ma_attr.la_mode & S_IFMT) {
2082 static int mdd_object_create(const struct lu_env *env,
2083 struct md_object *obj,
2084 const struct md_op_spec *spec,
2088 struct mdd_device *mdd = mdo2mdd(obj);
2089 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2090 const struct lu_fid *pfid = spec->u.sp_pfid;
2091 struct thandle *handle;
2092 #ifdef HAVE_QUOTA_SUPPORT
2093 struct obd_device *obd = mdd->mdd_obd_dev;
2094 struct obd_export *exp = md_quota(env)->mq_exp;
2095 struct mds_obd *mds = &obd->u.mds;
2096 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2097 int quota_opc = 0, block_count = 0;
2098 int inode_pending[MAXQUOTAS] = { 0, 0 };
2099 int block_pending[MAXQUOTAS] = { 0, 0 };
2104 /* XXX: this code won't be used ever:
2105 * DNE uses slightly different approach */
2108 #ifdef HAVE_QUOTA_SUPPORT
2109 if (mds->mds_quota) {
2110 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2111 mdd_quota_wrapper(&ma->ma_attr, qids);
2112 /* get file quota for child */
2113 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2114 qids, inode_pending, 1, NULL, 0,
2116 switch (ma->ma_attr.la_mode & S_IFMT) {
2125 /* get block quota for child */
2127 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2128 qids, block_pending, block_count,
2129 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2133 handle = mdd_trans_create(env, mdd);
2135 GOTO(out_pending, rc = PTR_ERR(handle));
2137 rc = mdd_trans_start(env, mdd, handle);
2139 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2140 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2144 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2148 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2149 /* If creating the slave object, set slave EA here. */
2150 int lmv_size = spec->u.sp_ea.eadatalen;
2151 struct lmv_stripe_md *lmv;
2153 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2154 LASSERT(lmv != NULL && lmv_size > 0);
2156 rc = __mdd_xattr_set(env, mdd_obj,
2157 mdd_buf_get_const(env, lmv, lmv_size),
2158 XATTR_NAME_LMV, 0, handle);
2162 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2165 #ifdef CONFIG_FS_POSIX_ACL
2166 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2167 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2169 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2170 buf->lb_len = spec->u.sp_ea.eadatalen;
2171 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2172 rc = __mdd_acl_init(env, mdd_obj, buf,
2173 &ma->ma_attr.la_mode,
2178 ma->ma_attr.la_valid |= LA_MODE;
2181 pfid = spec->u.sp_ea.fid;
2184 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2190 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2191 mdd_write_unlock(env, mdd_obj);
2193 mdd_trans_stop(env, mdd, rc, handle);
2195 #ifdef HAVE_QUOTA_SUPPORT
2197 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2199 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2201 /* Trigger dqacq on the owner of child. If failed,
2202 * the next call for lquota_chkquota will process it. */
2203 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2211 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2212 const struct md_attr *ma)
2214 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2215 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2216 struct mdd_device *mdd = mdo2mdd(obj);
2217 struct thandle *handle;
2221 /* XXX: this code won't be used ever:
2222 * DNE uses slightly different approach */
2225 handle = mdd_trans_create(env, mdd);
2229 rc = mdd_trans_start(env, mdd, handle);
2231 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2232 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2234 mdo_ref_add(env, mdd_obj, handle);
2235 mdd_write_unlock(env, mdd_obj);
2237 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2238 la_copy->la_ctime = ma->ma_attr.la_ctime;
2240 la_copy->la_valid = LA_CTIME;
2241 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2244 mdd_trans_stop(env, mdd, 0, handle);
2250 * do NOT or the MAY_*'s, you'll get the weakest
2252 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2256 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2257 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2258 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2259 * owner can write to a file even if it is marked readonly to hide
2260 * its brokenness. (bug 5781) */
2261 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2262 struct md_ucred *uc = md_ucred(env);
2264 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2265 (la->la_uid == uc->mu_fsuid))
2269 if (flags & FMODE_READ)
2271 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2273 if (flags & MDS_FMODE_EXEC)
2278 static int mdd_open_sanity_check(const struct lu_env *env,
2279 struct mdd_object *obj, int flag)
2281 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2286 if (mdd_is_dead_obj(obj))
2289 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2293 if (S_ISLNK(tmp_la->la_mode))
2296 mode = accmode(env, tmp_la, flag);
2298 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2301 if (!(flag & MDS_OPEN_CREATED)) {
2302 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2307 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2308 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2309 flag &= ~MDS_OPEN_TRUNC;
2311 /* For writing append-only file must open it with append mode. */
2312 if (mdd_is_append(obj)) {
2313 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2315 if (flag & MDS_OPEN_TRUNC)
2321 * Now, flag -- O_NOATIME does not be packed by client.
2323 if (flag & O_NOATIME) {
2324 struct md_ucred *uc = md_ucred(env);
2326 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2327 (uc->mu_valid == UCRED_NEW)) &&
2328 (uc->mu_fsuid != tmp_la->la_uid) &&
2329 !mdd_capable(uc, CFS_CAP_FOWNER))
2337 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2340 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2343 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2345 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2347 mdd_obj->mod_count++;
2349 mdd_write_unlock(env, mdd_obj);
2353 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2354 struct md_attr *ma, struct thandle *handle)
2358 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2362 return mdo_declare_destroy(env, obj, handle);
2365 /* return md_attr back,
2366 * if it is last unlink then return lov ea + llog cookie*/
2367 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2368 struct md_attr *ma, struct thandle *handle)
2373 if (S_ISREG(mdd_object_type(obj))) {
2374 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2375 * Caller must be ready for that. */
2376 rc = __mdd_lmm_get(env, obj, ma);
2377 if ((ma->ma_valid & MA_LOV))
2378 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2383 rc = mdo_destroy(env, obj, handle);
2388 static int mdd_declare_close(const struct lu_env *env,
2389 struct mdd_object *obj,
2391 struct thandle *handle)
2395 rc = orph_declare_index_delete(env, obj, handle);
2399 return mdd_declare_object_kill(env, obj, ma, handle);
2403 * No permission check is needed.
2405 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2406 struct md_attr *ma, int mode)
2408 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2409 struct mdd_device *mdd = mdo2mdd(obj);
2410 struct thandle *handle = NULL;
2412 int is_orphan = 0, reset = 1;
2414 #ifdef HAVE_QUOTA_SUPPORT
2415 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2416 struct mds_obd *mds = &obd->u.mds;
2417 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2422 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2423 mdd_obj->mod_count--;
2425 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2426 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2427 "list\n", PFID(mdd_object_fid(mdd_obj)));
2431 /* check without any lock */
2432 if (mdd_obj->mod_count == 1 &&
2433 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2435 handle = mdd_trans_create(env, mdo2mdd(obj));
2437 RETURN(PTR_ERR(handle));
2439 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2443 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2447 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2452 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2453 if (handle == NULL && mdd_obj->mod_count == 1 &&
2454 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2455 mdd_write_unlock(env, mdd_obj);
2459 /* release open count */
2460 mdd_obj->mod_count --;
2462 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2463 /* remove link to object from orphan index */
2464 LASSERT(handle != NULL);
2465 rc = __mdd_orphan_del(env, mdd_obj, handle);
2467 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2468 "list, OSS objects to be destroyed.\n",
2469 PFID(mdd_object_fid(mdd_obj)));
2472 CERROR("Object "DFID" can not be deleted from orphan "
2473 "list, maybe cause OST objects can not be "
2474 "destroyed (err: %d).\n",
2475 PFID(mdd_object_fid(mdd_obj)), rc);
2476 /* If object was not deleted from orphan list, do not
2477 * destroy OSS objects, which will be done when next
2483 rc = mdd_iattr_get(env, mdd_obj, ma);
2484 /* Object maybe not in orphan list originally, it is rare case for
2485 * mdd_finish_unlink() failure. */
2486 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2487 #ifdef HAVE_QUOTA_SUPPORT
2488 if (mds->mds_quota) {
2489 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2490 mdd_quota_wrapper(&ma->ma_attr, qids);
2493 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2494 if (ma->ma_valid & MA_FLAGS &&
2495 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2496 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2498 if (handle == NULL) {
2499 handle = mdd_trans_create(env, mdo2mdd(obj));
2501 GOTO(out, rc = PTR_ERR(handle));
2503 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2508 rc = mdd_declare_changelog_store(env, mdd,
2513 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2518 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2524 CERROR("Error when prepare to delete Object "DFID" , "
2525 "which will cause OST objects can not be "
2526 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2532 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2534 mdd_write_unlock(env, mdd_obj);
2537 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2538 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2539 if (handle == NULL) {
2540 handle = mdd_trans_create(env, mdo2mdd(obj));
2542 GOTO(stop, rc = IS_ERR(handle));
2544 rc = mdd_declare_changelog_store(env, mdd, NULL,
2549 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2554 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2560 mdd_trans_stop(env, mdd, rc, handle);
2561 #ifdef HAVE_QUOTA_SUPPORT
2563 /* Trigger dqrel on the owner of child. If failed,
2564 * the next call for lquota_chkquota will process it */
2565 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2572 * Permission check is done when open,
2573 * no need check again.
2575 static int mdd_readpage_sanity_check(const struct lu_env *env,
2576 struct mdd_object *obj)
2578 struct dt_object *next = mdd_object_child(obj);
2582 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2590 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2591 struct lu_dirpage *dp, int nob,
2592 const struct dt_it_ops *iops, struct dt_it *it,
2598 struct lu_dirent *ent;
2599 struct lu_dirent *last = NULL;
2602 memset(area, 0, sizeof (*dp));
2603 area += sizeof (*dp);
2604 nob -= sizeof (*dp);
2611 len = iops->key_size(env, it);
2613 /* IAM iterator can return record with zero len. */
2617 hash = iops->store(env, it);
2618 if (unlikely(first)) {
2620 dp->ldp_hash_start = cpu_to_le64(hash);
2623 /* calculate max space required for lu_dirent */
2624 recsize = lu_dirent_calc_size(len, attr);
2626 if (nob >= recsize) {
2627 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2628 if (result == -ESTALE)
2633 /* osd might not able to pack all attributes,
2634 * so recheck rec length */
2635 recsize = le16_to_cpu(ent->lde_reclen);
2637 result = (last != NULL) ? 0 :-EINVAL;
2641 ent = (void *)ent + recsize;
2645 result = iops->next(env, it);
2646 if (result == -ESTALE)
2648 } while (result == 0);
2651 dp->ldp_hash_end = cpu_to_le64(hash);
2653 if (last->lde_hash == dp->ldp_hash_end)
2654 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2655 last->lde_reclen = 0; /* end mark */
2660 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2661 const struct lu_rdpg *rdpg)
2664 struct dt_object *next = mdd_object_child(obj);
2665 const struct dt_it_ops *iops;
2667 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2673 LASSERT(rdpg->rp_pages != NULL);
2674 LASSERT(next->do_index_ops != NULL);
2676 if (rdpg->rp_count <= 0)
2680 * iterate through directory and fill pages from @rdpg
2682 iops = &next->do_index_ops->dio_it;
2683 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2687 rc = iops->load(env, it, rdpg->rp_hash);
2691 * Iterator didn't find record with exactly the key requested.
2693 * It is currently either
2695 * - positioned above record with key less than
2696 * requested---skip it.
2698 * - or not positioned at all (is in IAM_IT_SKEWED
2699 * state)---position it on the next item.
2701 rc = iops->next(env, it);
2706 * At this point and across for-loop:
2708 * rc == 0 -> ok, proceed.
2709 * rc > 0 -> end of directory.
2712 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2713 i++, nob -= CFS_PAGE_SIZE) {
2714 struct lu_dirpage *dp;
2716 LASSERT(i < rdpg->rp_npages);
2717 pg = rdpg->rp_pages[i];
2719 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2722 rc = mdd_dir_page_build(env, mdd, dp,
2723 min_t(int, nob, LU_PAGE_SIZE),
2724 iops, it, rdpg->rp_attrs);
2729 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2731 } else if (rc < 0) {
2732 CWARN("build page failed: %d!\n", rc);
2735 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2736 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2737 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2744 struct lu_dirpage *dp;
2746 dp = cfs_kmap(rdpg->rp_pages[0]);
2747 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2750 * No pages were processed, mark this for first page
2753 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2756 cfs_kunmap(rdpg->rp_pages[0]);
2758 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2761 iops->fini(env, it);
2766 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2767 const struct lu_rdpg *rdpg)
2769 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2773 if (mdd_object_exists(mdd_obj) == 0) {
2774 CERROR("%s: object "DFID" not found: rc = -2\n",
2775 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2779 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2780 rc = mdd_readpage_sanity_check(env, mdd_obj);
2782 GOTO(out_unlock, rc);
2784 if (mdd_is_dead_obj(mdd_obj)) {
2786 struct lu_dirpage *dp;
2789 * According to POSIX, please do not return any entry to client:
2790 * even dot and dotdot should not be returned.
2792 CWARN("readdir from dead object: "DFID"\n",
2793 PFID(mdd_object_fid(mdd_obj)));
2795 if (rdpg->rp_count <= 0)
2796 GOTO(out_unlock, rc = -EFAULT);
2797 LASSERT(rdpg->rp_pages != NULL);
2799 pg = rdpg->rp_pages[0];
2800 dp = (struct lu_dirpage*)cfs_kmap(pg);
2801 memset(dp, 0 , sizeof(struct lu_dirpage));
2802 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2803 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2804 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2806 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2809 rc = __mdd_readpage(env, mdd_obj, rdpg);
2813 mdd_read_unlock(env, mdd_obj);
2817 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2819 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2821 if (mdd_object_exists(mdd_obj) == 0) {
2822 CERROR("%s: object "DFID" not found: rc = -2\n",
2823 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2826 return dt_object_sync(env, mdd_object_child(mdd_obj));
2829 const struct md_object_operations mdd_obj_ops = {
2830 .moo_permission = mdd_permission,
2831 .moo_attr_get = mdd_attr_get,
2832 .moo_attr_set = mdd_attr_set,
2833 .moo_xattr_get = mdd_xattr_get,
2834 .moo_xattr_set = mdd_xattr_set,
2835 .moo_xattr_list = mdd_xattr_list,
2836 .moo_xattr_del = mdd_xattr_del,
2837 .moo_object_create = mdd_object_create,
2838 .moo_ref_add = mdd_ref_add,
2839 .moo_ref_del = mdd_ref_del,
2840 .moo_open = mdd_open,
2841 .moo_close = mdd_close,
2842 .moo_readpage = mdd_readpage,
2843 .moo_readlink = mdd_readlink,
2844 .moo_changelog = mdd_changelog,
2845 .moo_capa_get = mdd_capa_get,
2846 .moo_object_sync = mdd_object_sync,
2847 .moo_path = mdd_path,
2848 .moo_file_lock = mdd_file_lock,
2849 .moo_file_unlock = mdd_file_unlock,