4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
50 #include <obd_class.h>
51 #include <obd_support.h>
52 #include <lprocfs_status.h>
53 /* fid_be_cpu(), fid_cpu_to_be(). */
54 #include <lustre_fid.h>
57 #include <lustre_param.h>
58 #include <lustre_mds.h>
59 #include <lustre/lustre_idl.h>
61 #include "mdd_internal.h"
63 static const struct lu_object_operations mdd_lu_obj_ops;
65 static int mdd_xattr_get(const struct lu_env *env,
66 struct md_object *obj, struct lu_buf *buf,
69 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
72 if (mdd_object_exists(obj) == 0) {
73 CERROR("%s: object "DFID" not found: rc = -2\n",
74 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
77 mdo_data_get(env, obj, data);
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82 struct lu_attr *la, struct lustre_capa *capa)
84 if (mdd_object_exists(obj) == 0) {
85 CERROR("%s: object "DFID" not found: rc = -2\n",
86 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
89 return mdo_attr_get(env, obj, la, capa);
92 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
94 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
96 if (flags & LUSTRE_APPEND_FL)
97 obj->mod_flags |= APPEND_OBJ;
99 if (flags & LUSTRE_IMMUTABLE_FL)
100 obj->mod_flags |= IMMUTE_OBJ;
103 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
105 struct mdd_thread_info *info;
107 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
108 LASSERT(info != NULL);
112 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 buf = &mdd_env_info(env)->mti_buf;
122 void mdd_buf_put(struct lu_buf *buf)
124 if (buf == NULL || buf->lb_buf == NULL)
126 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
131 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
132 const void *area, ssize_t len)
136 buf = &mdd_env_info(env)->mti_buf;
137 buf->lb_buf = (void *)area;
142 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
144 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
146 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
147 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
150 if (buf->lb_buf == NULL) {
152 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
153 if (buf->lb_buf == NULL)
159 /** Increase the size of the \a mti_big_buf.
160 * preserves old data in buffer
161 * old buffer remains unchanged on error
162 * \retval 0 or -ENOMEM
164 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
166 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
169 LASSERT(len >= oldbuf->lb_len);
170 OBD_ALLOC_LARGE(buf.lb_buf, len);
172 if (buf.lb_buf == NULL)
176 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
178 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
180 memcpy(oldbuf, &buf, sizeof(buf));
185 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
186 struct mdd_device *mdd)
188 struct mdd_thread_info *mti = mdd_env_info(env);
191 max_cookie_size = mdd_lov_cookiesize(env, mdd);
192 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
193 if (mti->mti_max_cookie)
194 OBD_FREE_LARGE(mti->mti_max_cookie,
195 mti->mti_max_cookie_size);
196 mti->mti_max_cookie = NULL;
197 mti->mti_max_cookie_size = 0;
199 if (unlikely(mti->mti_max_cookie == NULL)) {
200 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
201 if (likely(mti->mti_max_cookie != NULL))
202 mti->mti_max_cookie_size = max_cookie_size;
204 if (likely(mti->mti_max_cookie != NULL))
205 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
206 return mti->mti_max_cookie;
209 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
211 struct mdd_thread_info *mti = mdd_env_info(env);
213 if (unlikely(mti->mti_max_lmm_size < size)) {
214 int rsize = size_roundup_power2(size);
216 if (mti->mti_max_lmm_size > 0) {
217 LASSERT(mti->mti_max_lmm);
218 OBD_FREE_LARGE(mti->mti_max_lmm,
219 mti->mti_max_lmm_size);
220 mti->mti_max_lmm = NULL;
221 mti->mti_max_lmm_size = 0;
224 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
225 if (likely(mti->mti_max_lmm != NULL))
226 mti->mti_max_lmm_size = rsize;
228 return mti->mti_max_lmm;
231 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
232 struct mdd_device *mdd)
236 max_lmm_size = mdd_lov_mdsize(env, mdd);
237 return mdd_max_lmm_buffer(env, max_lmm_size);
240 struct lu_object *mdd_object_alloc(const struct lu_env *env,
241 const struct lu_object_header *hdr,
244 struct mdd_object *mdd_obj;
246 OBD_ALLOC_PTR(mdd_obj);
247 if (mdd_obj != NULL) {
250 o = mdd2lu_obj(mdd_obj);
251 lu_object_init(o, NULL, d);
252 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
253 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
254 mdd_obj->mod_count = 0;
255 o->lo_ops = &mdd_lu_obj_ops;
262 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
263 const struct lu_object_conf *unused)
265 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
266 struct mdd_object *mdd_obj = lu2mdd_obj(o);
267 struct lu_object *below;
268 struct lu_device *under;
271 mdd_obj->mod_cltime = 0;
272 under = &d->mdd_child->dd_lu_dev;
273 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
274 mdd_pdlock_init(mdd_obj);
278 lu_object_add(o, below);
283 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
285 if (lu_object_exists(o))
286 return mdd_get_flags(env, lu2mdd_obj(o));
291 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
293 struct mdd_object *mdd = lu2mdd_obj(o);
299 static int mdd_object_print(const struct lu_env *env, void *cookie,
300 lu_printer_t p, const struct lu_object *o)
302 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
303 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
304 "valid=%x, cltime="LPU64", flags=%lx)",
305 mdd, mdd->mod_count, mdd->mod_valid,
306 mdd->mod_cltime, mdd->mod_flags);
309 static const struct lu_object_operations mdd_lu_obj_ops = {
310 .loo_object_init = mdd_object_init,
311 .loo_object_start = mdd_object_start,
312 .loo_object_free = mdd_object_free,
313 .loo_object_print = mdd_object_print,
316 struct mdd_object *mdd_object_find(const struct lu_env *env,
317 struct mdd_device *d,
318 const struct lu_fid *f)
320 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
323 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
324 const char *path, struct lu_fid *fid)
327 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
328 struct mdd_object *obj;
329 struct lu_name *lname = &mdd_env_info(env)->mti_name;
334 /* temp buffer for path element */
335 buf = mdd_buf_alloc(env, PATH_MAX);
336 if (buf->lb_buf == NULL)
339 lname->ln_name = name = buf->lb_buf;
340 lname->ln_namelen = 0;
341 *f = mdd->mdd_root_fid;
348 while (*path != '/' && *path != '\0') {
356 /* find obj corresponding to fid */
357 obj = mdd_object_find(env, mdd, f);
359 GOTO(out, rc = -EREMOTE);
361 GOTO(out, rc = PTR_ERR(obj));
362 /* get child fid from parent and name */
363 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
364 mdd_object_put(env, obj);
369 lname->ln_namelen = 0;
378 /** The maximum depth that fid2path() will search.
379 * This is limited only because we want to store the fids for
380 * historical path lookup purposes.
382 #define MAX_PATH_DEPTH 100
384 /** mdd_path() lookup structure. */
385 struct path_lookup_info {
386 __u64 pli_recno; /**< history point */
387 __u64 pli_currec; /**< current record */
388 struct lu_fid pli_fid;
389 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
390 struct mdd_object *pli_mdd_obj;
391 char *pli_path; /**< full path */
393 int pli_linkno; /**< which hardlink to follow */
394 int pli_fidcount; /**< number of \a pli_fids */
397 static int mdd_path_current(const struct lu_env *env,
398 struct path_lookup_info *pli)
400 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
401 struct mdd_object *mdd_obj;
402 struct lu_buf *buf = NULL;
403 struct link_ea_header *leh;
404 struct link_ea_entry *lee;
405 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
406 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
412 ptr = pli->pli_path + pli->pli_pathlen - 1;
415 pli->pli_fidcount = 0;
416 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
418 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
419 mdd_obj = mdd_object_find(env, mdd,
420 &pli->pli_fids[pli->pli_fidcount]);
422 GOTO(out, rc = -EREMOTE);
424 GOTO(out, rc = PTR_ERR(mdd_obj));
425 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
427 mdd_object_put(env, mdd_obj);
431 /* Do I need to error out here? */
436 /* Get parent fid and object name */
437 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
438 buf = mdd_links_get(env, mdd_obj);
439 mdd_read_unlock(env, mdd_obj);
440 mdd_object_put(env, mdd_obj);
442 GOTO(out, rc = PTR_ERR(buf));
445 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
446 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
448 /* If set, use link #linkno for path lookup, otherwise use
449 link #0. Only do this for the final path element. */
450 if ((pli->pli_fidcount == 0) &&
451 (pli->pli_linkno < leh->leh_reccount)) {
453 for (count = 0; count < pli->pli_linkno; count++) {
454 lee = (struct link_ea_entry *)
455 ((char *)lee + reclen);
456 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
458 if (pli->pli_linkno < leh->leh_reccount - 1)
459 /* indicate to user there are more links */
463 /* Pack the name in the end of the buffer */
464 ptr -= tmpname->ln_namelen;
465 if (ptr - 1 <= pli->pli_path)
466 GOTO(out, rc = -EOVERFLOW);
467 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
470 /* Store the parent fid for historic lookup */
471 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
472 GOTO(out, rc = -EOVERFLOW);
473 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
476 /* Verify that our path hasn't changed since we started the lookup.
477 Record the current index, and verify the path resolves to the
478 same fid. If it does, then the path is correct as of this index. */
479 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
480 pli->pli_currec = mdd->mdd_cl.mc_index;
481 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
482 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
484 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
485 GOTO (out, rc = -EAGAIN);
487 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
488 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
489 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
490 PFID(&pli->pli_fid));
491 GOTO(out, rc = -EAGAIN);
493 ptr++; /* skip leading / */
494 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
498 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
499 /* if we vmalloced a large buffer drop it */
505 static int mdd_path_historic(const struct lu_env *env,
506 struct path_lookup_info *pli)
511 /* Returns the full path to this fid, as of changelog record recno. */
512 static int mdd_path(const struct lu_env *env, struct md_object *obj,
513 char *path, int pathlen, __u64 *recno, int *linkno)
515 struct path_lookup_info *pli;
523 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
532 pli->pli_mdd_obj = md2mdd_obj(obj);
533 pli->pli_recno = *recno;
534 pli->pli_path = path;
535 pli->pli_pathlen = pathlen;
536 pli->pli_linkno = *linkno;
538 /* Retry multiple times in case file is being moved */
539 while (tries-- && rc == -EAGAIN)
540 rc = mdd_path_current(env, pli);
542 /* For historical path lookup, the current links may not have existed
543 * at "recno" time. We must switch over to earlier links/parents
544 * by using the changelog records. If the earlier parent doesn't
545 * exist, we must search back through the changelog to reconstruct
546 * its parents, then check if it exists, etc.
547 * We may ignore this problem for the initial implementation and
548 * state that an "original" hardlink must still exist for us to find
549 * historic path name. */
550 if (pli->pli_recno != -1) {
551 rc = mdd_path_historic(env, pli);
553 *recno = pli->pli_currec;
554 /* Return next link index to caller */
555 *linkno = pli->pli_linkno;
563 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
565 struct lu_attr *la = &mdd_env_info(env)->mti_la;
569 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
571 mdd_flags_xlate(obj, la->la_flags);
576 /* get only inode attributes */
577 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
583 if (ma->ma_valid & MA_INODE)
586 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
587 mdd_object_capa(env, mdd_obj));
589 ma->ma_valid |= MA_INODE;
593 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
595 struct lov_desc *ldesc;
596 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
597 struct lov_user_md *lum = (struct lov_user_md*)lmm;
603 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
604 LASSERT(ldesc != NULL);
606 lum->lmm_magic = LOV_MAGIC_V1;
607 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
608 lum->lmm_pattern = ldesc->ld_pattern;
609 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
610 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
611 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
613 RETURN(sizeof(*lum));
616 static int is_rootdir(struct mdd_object *mdd_obj)
618 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
619 const struct lu_fid *fid = mdo2fid(mdd_obj);
621 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
624 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
627 struct mdd_thread_info *info = mdd_env_info(env);
632 LASSERT(info != NULL);
633 LASSERT(ma->ma_lmm_size > 0);
634 LASSERT(ma->ma_big_lmm_used == 0);
636 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
637 mdd_object_capa(env, obj));
641 /* big_lmm may need to grow */
643 mdd_max_lmm_buffer(env, size);
644 if (info->mti_max_lmm == NULL)
647 LASSERT(info->mti_max_lmm_size >= size);
648 rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
653 ma->ma_big_lmm_used = 1;
654 ma->ma_valid |= MA_LOV;
655 ma->ma_lmm = info->mti_max_lmm;
656 ma->ma_lmm_size = size;
661 /* get lov EA only */
662 static int __mdd_lmm_get(const struct lu_env *env,
663 struct mdd_object *mdd_obj, struct md_attr *ma)
668 if (ma->ma_valid & MA_LOV)
671 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
674 rc = mdd_big_lmm_get(env, mdd_obj, ma);
675 else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
676 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
679 ma->ma_lmm_size = rc;
680 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
681 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
687 /* get the first parent fid from link EA */
688 static int mdd_pfid_get(const struct lu_env *env,
689 struct mdd_object *mdd_obj, struct md_attr *ma)
692 struct link_ea_header *leh;
693 struct link_ea_entry *lee;
694 struct lu_fid *pfid = &ma->ma_pfid;
697 if (ma->ma_valid & MA_PFID)
700 buf = mdd_links_get(env, mdd_obj);
702 RETURN(PTR_ERR(buf));
705 lee = (struct link_ea_entry *)(leh + 1);
706 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
707 fid_be_to_cpu(pfid, pfid);
708 ma->ma_valid |= MA_PFID;
709 if (buf->lb_len > OBD_ALLOC_BIG)
710 /* if we vmalloced a large buffer drop it */
715 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
721 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
722 rc = __mdd_lmm_get(env, mdd_obj, ma);
723 mdd_read_unlock(env, mdd_obj);
728 static int __mdd_lmv_get(const struct lu_env *env,
729 struct mdd_object *mdd_obj, struct md_attr *ma)
734 if (ma->ma_valid & MA_LMV)
737 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
740 ma->ma_valid |= MA_LMV;
746 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
749 struct mdd_thread_info *info = mdd_env_info(env);
750 struct lustre_mdt_attrs *lma =
751 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
756 /* If all needed data are already valid, nothing to do */
757 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
758 (ma->ma_need & (MA_HSM | MA_SOM)))
761 /* Read LMA from disk EA */
762 lma_size = sizeof(info->mti_xattr_buf);
763 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
767 /* Useless to check LMA incompatibility because this is already done in
768 * osd_ea_fid_get(), and this will fail long before this code is
770 * So, if we are here, LMA is compatible.
773 lustre_lma_swab(lma);
775 /* Swab and copy LMA */
776 if (ma->ma_need & MA_HSM) {
777 if (lma->lma_compat & LMAC_HSM)
778 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
780 ma->ma_hsm.mh_flags = 0;
781 ma->ma_valid |= MA_HSM;
785 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
786 LASSERT(ma->ma_som != NULL);
787 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
788 ma->ma_som->msd_size = lma->lma_som_size;
789 ma->ma_som->msd_blocks = lma->lma_som_blocks;
790 ma->ma_som->msd_mountid = lma->lma_som_mountid;
791 ma->ma_valid |= MA_SOM;
797 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
803 if (ma->ma_need & MA_INODE)
804 rc = mdd_iattr_get(env, mdd_obj, ma);
806 if (rc == 0 && ma->ma_need & MA_LOV) {
807 if (S_ISREG(mdd_object_type(mdd_obj)) ||
808 S_ISDIR(mdd_object_type(mdd_obj)))
809 rc = __mdd_lmm_get(env, mdd_obj, ma);
811 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
812 if (S_ISREG(mdd_object_type(mdd_obj)))
813 rc = mdd_pfid_get(env, mdd_obj, ma);
815 if (rc == 0 && ma->ma_need & MA_LMV) {
816 if (S_ISDIR(mdd_object_type(mdd_obj)))
817 rc = __mdd_lmv_get(env, mdd_obj, ma);
819 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
820 if (S_ISREG(mdd_object_type(mdd_obj)))
821 rc = __mdd_lma_get(env, mdd_obj, ma);
823 #ifdef CONFIG_FS_POSIX_ACL
824 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
825 if (S_ISDIR(mdd_object_type(mdd_obj)))
826 rc = mdd_def_acl_get(env, mdd_obj, ma);
829 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
830 rc, ma->ma_valid, ma->ma_lmm);
834 int mdd_attr_get_internal_locked(const struct lu_env *env,
835 struct mdd_object *mdd_obj, struct md_attr *ma)
838 int needlock = ma->ma_need &
839 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
842 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
843 rc = mdd_attr_get_internal(env, mdd_obj, ma);
845 mdd_read_unlock(env, mdd_obj);
850 * No permission check is needed.
852 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
855 struct mdd_object *mdd_obj = md2mdd_obj(obj);
859 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
864 * No permission check is needed.
866 static int mdd_xattr_get(const struct lu_env *env,
867 struct md_object *obj, struct lu_buf *buf,
870 struct mdd_object *mdd_obj = md2mdd_obj(obj);
875 if (mdd_object_exists(mdd_obj) == 0) {
876 CERROR("%s: object "DFID" not found: rc = -2\n",
877 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
881 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
882 rc = mdo_xattr_get(env, mdd_obj, buf, name,
883 mdd_object_capa(env, mdd_obj));
884 mdd_read_unlock(env, mdd_obj);
890 * Permission check is done when open,
891 * no need check again.
893 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
896 struct mdd_object *mdd_obj = md2mdd_obj(obj);
897 struct dt_object *next;
902 if (mdd_object_exists(mdd_obj) == 0) {
903 CERROR("%s: object "DFID" not found: rc = -2\n",
904 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
908 next = mdd_object_child(mdd_obj);
909 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
910 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
911 mdd_object_capa(env, mdd_obj));
912 mdd_read_unlock(env, mdd_obj);
917 * No permission check is needed.
919 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
922 struct mdd_object *mdd_obj = md2mdd_obj(obj);
927 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
928 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
929 mdd_read_unlock(env, mdd_obj);
934 int mdd_declare_object_create_internal(const struct lu_env *env,
935 struct mdd_object *p,
936 struct mdd_object *c,
938 struct thandle *handle,
939 const struct md_op_spec *spec)
941 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
942 const struct dt_index_features *feat = spec->sp_feat;
946 if (feat != &dt_directory_features && feat != NULL)
947 dof->dof_type = DFT_INDEX;
949 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
951 dof->u.dof_idx.di_feat = feat;
953 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
958 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
959 struct mdd_object *c, struct md_attr *ma,
960 struct thandle *handle,
961 const struct md_op_spec *spec)
963 struct lu_attr *attr = &ma->ma_attr;
964 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
965 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
966 const struct dt_index_features *feat = spec->sp_feat;
970 if (!mdd_object_exists(c)) {
971 struct dt_object *next = mdd_object_child(c);
974 if (feat != &dt_directory_features && feat != NULL)
975 dof->dof_type = DFT_INDEX;
977 dof->dof_type = dt_mode_to_dft(attr->la_mode);
979 dof->u.dof_idx.di_feat = feat;
981 /* @hint will be initialized by underlying device. */
982 next->do_ops->do_ah_init(env, hint,
983 p ? mdd_object_child(p) : NULL,
984 attr->la_mode & S_IFMT);
986 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
987 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
995 * Make sure the ctime is increased only.
997 static inline int mdd_attr_check(const struct lu_env *env,
998 struct mdd_object *obj,
999 struct lu_attr *attr)
1001 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1005 if (attr->la_valid & LA_CTIME) {
1006 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1010 if (attr->la_ctime < tmp_la->la_ctime)
1011 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1012 else if (attr->la_valid == LA_CTIME &&
1013 attr->la_ctime == tmp_la->la_ctime)
1014 attr->la_valid &= ~LA_CTIME;
1019 int mdd_attr_set_internal(const struct lu_env *env,
1020 struct mdd_object *obj,
1021 struct lu_attr *attr,
1022 struct thandle *handle,
1028 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1029 #ifdef CONFIG_FS_POSIX_ACL
1030 if (!rc && (attr->la_valid & LA_MODE) && needacl)
1031 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1036 int mdd_attr_check_set_internal(const struct lu_env *env,
1037 struct mdd_object *obj,
1038 struct lu_attr *attr,
1039 struct thandle *handle,
1045 rc = mdd_attr_check(env, obj, attr);
1050 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1054 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1055 struct mdd_object *obj,
1056 struct lu_attr *attr,
1057 struct thandle *handle,
1063 needacl = needacl && (attr->la_valid & LA_MODE);
1065 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1066 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1068 mdd_write_unlock(env, obj);
1072 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1073 struct mdd_object *obj,
1074 struct lu_attr *attr,
1075 struct thandle *handle,
1081 needacl = needacl && (attr->la_valid & LA_MODE);
1083 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1084 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1086 mdd_write_unlock(env, obj);
1090 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1091 const struct lu_buf *buf, const char *name,
1092 int fl, struct thandle *handle)
1094 struct lustre_capa *capa = mdd_object_capa(env, obj);
1098 if (buf->lb_buf && buf->lb_len > 0)
1099 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1100 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1101 rc = mdo_xattr_del(env, obj, name, handle, capa);
1107 * This gives the same functionality as the code between
1108 * sys_chmod and inode_setattr
1109 * chown_common and inode_setattr
1110 * utimes and inode_setattr
1111 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1113 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1114 struct lu_attr *la, const struct md_attr *ma)
1116 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1117 struct md_ucred *uc;
1124 /* Do not permit change file type */
1125 if (la->la_valid & LA_TYPE)
1128 /* They should not be processed by setattr */
1129 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1132 /* export destroy does not have ->le_ses, but we may want
1133 * to drop LUSTRE_SOM_FL. */
1139 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1143 if (la->la_valid == LA_CTIME) {
1144 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1145 /* This is only for set ctime when rename's source is
1147 rc = mdd_may_delete(env, NULL, obj,
1148 (struct md_attr *)ma, 1, 0);
1149 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1150 la->la_valid &= ~LA_CTIME;
1154 if (la->la_valid == LA_ATIME) {
1155 /* This is atime only set for read atime update on close. */
1156 if (la->la_atime >= tmp_la->la_atime &&
1157 la->la_atime < (tmp_la->la_atime +
1158 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1159 la->la_valid &= ~LA_ATIME;
1163 /* Check if flags change. */
1164 if (la->la_valid & LA_FLAGS) {
1165 unsigned int oldflags = 0;
1166 unsigned int newflags = la->la_flags &
1167 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1169 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1170 !mdd_capable(uc, CFS_CAP_FOWNER))
1173 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1174 * only be changed by the relevant capability. */
1175 if (mdd_is_immutable(obj))
1176 oldflags |= LUSTRE_IMMUTABLE_FL;
1177 if (mdd_is_append(obj))
1178 oldflags |= LUSTRE_APPEND_FL;
1179 if ((oldflags ^ newflags) &&
1180 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1183 if (!S_ISDIR(tmp_la->la_mode))
1184 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1187 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1188 (la->la_valid & ~LA_FLAGS) &&
1189 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1192 /* Check for setting the obj time. */
1193 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1194 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1195 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1196 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1197 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1205 if (la->la_valid & LA_KILL_SUID) {
1206 la->la_valid &= ~LA_KILL_SUID;
1207 if ((tmp_la->la_mode & S_ISUID) &&
1208 !(la->la_valid & LA_MODE)) {
1209 la->la_mode = tmp_la->la_mode;
1210 la->la_valid |= LA_MODE;
1212 la->la_mode &= ~S_ISUID;
1215 if (la->la_valid & LA_KILL_SGID) {
1216 la->la_valid &= ~LA_KILL_SGID;
1217 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1218 (S_ISGID | S_IXGRP)) &&
1219 !(la->la_valid & LA_MODE)) {
1220 la->la_mode = tmp_la->la_mode;
1221 la->la_valid |= LA_MODE;
1223 la->la_mode &= ~S_ISGID;
1226 /* Make sure a caller can chmod. */
1227 if (la->la_valid & LA_MODE) {
1228 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1229 (uc->mu_fsuid != tmp_la->la_uid) &&
1230 !mdd_capable(uc, CFS_CAP_FOWNER))
1233 if (la->la_mode == (cfs_umode_t) -1)
1234 la->la_mode = tmp_la->la_mode;
1236 la->la_mode = (la->la_mode & S_IALLUGO) |
1237 (tmp_la->la_mode & ~S_IALLUGO);
1239 /* Also check the setgid bit! */
1240 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1241 la->la_gid : tmp_la->la_gid) &&
1242 !mdd_capable(uc, CFS_CAP_FSETID))
1243 la->la_mode &= ~S_ISGID;
1245 la->la_mode = tmp_la->la_mode;
1248 /* Make sure a caller can chown. */
1249 if (la->la_valid & LA_UID) {
1250 if (la->la_uid == (uid_t) -1)
1251 la->la_uid = tmp_la->la_uid;
1252 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1253 (la->la_uid != tmp_la->la_uid)) &&
1254 !mdd_capable(uc, CFS_CAP_CHOWN))
1257 /* If the user or group of a non-directory has been
1258 * changed by a non-root user, remove the setuid bit.
1259 * 19981026 David C Niemi <niemi@tux.org>
1261 * Changed this to apply to all users, including root,
1262 * to avoid some races. This is the behavior we had in
1263 * 2.0. The check for non-root was definitely wrong
1264 * for 2.2 anyway, as it should have been using
1265 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1266 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1267 !S_ISDIR(tmp_la->la_mode)) {
1268 la->la_mode &= ~S_ISUID;
1269 la->la_valid |= LA_MODE;
1273 /* Make sure caller can chgrp. */
1274 if (la->la_valid & LA_GID) {
1275 if (la->la_gid == (gid_t) -1)
1276 la->la_gid = tmp_la->la_gid;
1277 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1278 ((la->la_gid != tmp_la->la_gid) &&
1279 !lustre_in_group_p(uc, la->la_gid))) &&
1280 !mdd_capable(uc, CFS_CAP_CHOWN))
1283 /* Likewise, if the user or group of a non-directory
1284 * has been changed by a non-root user, remove the
1285 * setgid bit UNLESS there is no group execute bit
1286 * (this would be a file marked for mandatory
1287 * locking). 19981026 David C Niemi <niemi@tux.org>
1289 * Removed the fsuid check (see the comment above) --
1291 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1292 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1293 la->la_mode &= ~S_ISGID;
1294 la->la_valid |= LA_MODE;
1298 /* For both Size-on-MDS case and truncate case,
1299 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1300 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1301 * For SOM case, it is true, the MAY_WRITE perm has been checked
1302 * when open, no need check again. For truncate case, it is false,
1303 * the MAY_WRITE perm should be checked here. */
1304 if (ma->ma_attr_flags & MDS_SOM) {
1305 /* For the "Size-on-MDS" setattr update, merge coming
1306 * attributes with the set in the inode. BUG 10641 */
1307 if ((la->la_valid & LA_ATIME) &&
1308 (la->la_atime <= tmp_la->la_atime))
1309 la->la_valid &= ~LA_ATIME;
1311 /* OST attributes do not have a priority over MDS attributes,
1312 * so drop times if ctime is equal. */
1313 if ((la->la_valid & LA_CTIME) &&
1314 (la->la_ctime <= tmp_la->la_ctime))
1315 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1317 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1318 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1319 (uc->mu_fsuid == tmp_la->la_uid)) &&
1320 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1321 rc = mdd_permission_internal_locked(env, obj,
1328 if (la->la_valid & LA_CTIME) {
1329 /* The pure setattr, it has the priority over what is
1330 * already set, do not drop it if ctime is equal. */
1331 if (la->la_ctime < tmp_la->la_ctime)
1332 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1340 /** Store a data change changelog record
1341 * If this fails, we must fail the whole transaction; we don't
1342 * want the change to commit without the log entry.
1343 * \param mdd_obj - mdd_object of change
1344 * \param handle - transacion handle
1346 static int mdd_changelog_data_store(const struct lu_env *env,
1347 struct mdd_device *mdd,
1348 enum changelog_rec_type type,
1350 struct mdd_object *mdd_obj,
1351 struct thandle *handle)
1353 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1354 struct llog_changelog_rec *rec;
1355 struct thandle *th = NULL;
1361 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1363 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1366 LASSERT(mdd_obj != NULL);
1367 LASSERT(handle != NULL);
1369 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1370 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1371 /* Don't need multiple updates in this log */
1372 /* Don't check under lock - no big deal if we get an extra
1377 reclen = llog_data_len(sizeof(*rec));
1378 buf = mdd_buf_alloc(env, reclen);
1379 if (buf->lb_buf == NULL)
1381 rec = (struct llog_changelog_rec *)buf->lb_buf;
1383 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1384 rec->cr.cr_type = (__u32)type;
1385 rec->cr.cr_tfid = *tfid;
1386 rec->cr.cr_namelen = 0;
1387 mdd_obj->mod_cltime = cfs_time_current_64();
1389 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1392 mdd_trans_stop(env, mdd, rc, th);
1395 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1396 rc, type, PFID(tfid));
1403 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1404 int flags, struct md_object *obj)
1406 struct thandle *handle;
1407 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1408 struct mdd_device *mdd = mdo2mdd(obj);
1412 handle = mdd_trans_create(env, mdd);
1414 return(PTR_ERR(handle));
1416 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1420 rc = mdd_trans_start(env, mdd, handle);
1424 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1428 mdd_trans_stop(env, mdd, rc, handle);
1434 * Should be called with write lock held.
1436 * \see mdd_lma_set_locked().
1438 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1439 const struct md_attr *ma, struct thandle *handle)
1441 struct mdd_thread_info *info = mdd_env_info(env);
1443 struct lustre_mdt_attrs *lma =
1444 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1445 int lmasize = sizeof(struct lustre_mdt_attrs);
1450 /* Either HSM or SOM part is not valid, we need to read it before */
1451 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1452 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1456 lustre_lma_swab(lma);
1458 memset(lma, 0, lmasize);
1462 if (ma->ma_valid & MA_HSM) {
1463 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1464 lma->lma_compat |= LMAC_HSM;
1468 if (ma->ma_valid & MA_SOM) {
1469 LASSERT(ma->ma_som != NULL);
1470 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1471 lma->lma_compat &= ~LMAC_SOM;
1473 lma->lma_compat |= LMAC_SOM;
1474 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1475 lma->lma_som_size = ma->ma_som->msd_size;
1476 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1477 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1482 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1484 lustre_lma_swab(lma);
1485 buf = mdd_buf_get(env, lma, lmasize);
1486 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1492 * Save LMA extended attributes with data from \a ma.
1494 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1495 * not, LMA EA will be first read from disk, modified and write back.
1498 static int mdd_lma_set_locked(const struct lu_env *env,
1499 struct mdd_object *mdd_obj,
1500 const struct md_attr *ma, struct thandle *handle)
1504 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1505 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1506 mdd_write_unlock(env, mdd_obj);
1510 /* Precedence for choosing record type when multiple
1511 * attributes change: setattr > mtime > ctime > atime
1512 * (ctime changes when mtime does, plus chmod/chown.
1513 * atime and ctime are independent.) */
1514 static int mdd_attr_set_changelog(const struct lu_env *env,
1515 struct md_object *obj, struct thandle *handle,
1518 struct mdd_device *mdd = mdo2mdd(obj);
1521 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1522 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1523 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1524 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1525 bits = bits & mdd->mdd_cl.mc_mask;
1529 /* The record type is the lowest non-masked set bit */
1530 while (bits && ((bits & 1) == 0)) {
1535 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1536 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1537 md2mdd_obj(obj), handle);
1540 static int mdd_declare_attr_set(const struct lu_env *env,
1541 struct mdd_device *mdd,
1542 struct mdd_object *obj,
1543 const struct md_attr *ma,
1544 struct lov_mds_md *lmm,
1545 struct thandle *handle)
1547 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1550 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1554 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1558 if (ma->ma_valid & MA_LOV) {
1560 buf->lb_len = ma->ma_lmm_size;
1561 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1567 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1569 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1570 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1576 #ifdef CONFIG_FS_POSIX_ACL
1577 if (ma->ma_attr.la_valid & LA_MODE) {
1578 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1579 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1581 mdd_read_unlock(env, obj);
1582 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1590 rc = mdo_declare_xattr_set(env, obj, buf,
1591 XATTR_NAME_ACL_ACCESS, 0,
1599 /* basically the log is the same as in unlink case */
1603 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1604 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1605 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1606 mdd->mdd_obd_dev->obd_name,
1607 le32_to_cpu(lmm->lmm_magic),
1608 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1612 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1613 if (stripe == LOV_ALL_STRIPES) {
1614 struct lov_desc *ldesc;
1616 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1617 LASSERT(ldesc != NULL);
1618 stripe = ldesc->ld_tgt_count;
1621 for (i = 0; i < stripe; i++) {
1622 rc = mdd_declare_llog_record(env, mdd,
1623 sizeof(struct llog_unlink_rec),
1633 /* set attr and LOV EA at once, return updated attr */
1634 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1635 const struct md_attr *ma)
1637 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1638 struct mdd_device *mdd = mdo2mdd(obj);
1639 struct thandle *handle;
1640 struct lov_mds_md *lmm = NULL;
1641 struct llog_cookie *logcookies = NULL;
1642 int rc, lmm_size = 0, cookie_size = 0;
1643 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1644 struct obd_device *obd = mdd->mdd_obd_dev;
1645 struct mds_obd *mds = &obd->u.mds;
1646 #ifdef HAVE_QUOTA_SUPPORT
1647 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1648 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1649 int quota_opc = 0, block_count = 0;
1650 int inode_pending[MAXQUOTAS] = { 0, 0 };
1651 int block_pending[MAXQUOTAS] = { 0, 0 };
1655 *la_copy = ma->ma_attr;
1656 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1660 /* setattr on "close" only change atime, or do nothing */
1661 if (ma->ma_valid == MA_INODE &&
1662 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1665 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1666 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1667 lmm_size = mdd_lov_mdsize(env, mdd);
1668 lmm = mdd_max_lmm_get(env, mdd);
1672 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1679 handle = mdd_trans_create(env, mdd);
1681 RETURN(PTR_ERR(handle));
1683 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1684 lmm_size > 0 ? lmm : NULL, handle);
1688 rc = mdd_trans_start(env, mdd, handle);
1692 /* permission changes may require sync operation */
1693 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1694 handle->th_sync |= !!mdd->mdd_sync_permission;
1696 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1697 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1698 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1700 #ifdef HAVE_QUOTA_SUPPORT
1701 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1702 struct obd_export *exp = md_quota(env)->mq_exp;
1703 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1705 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1707 quota_opc = FSFILT_OP_SETATTR;
1708 mdd_quota_wrapper(la_copy, qnids);
1709 mdd_quota_wrapper(la_tmp, qoids);
1710 /* get file quota for new owner */
1711 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1712 qnids, inode_pending, 1, NULL, 0,
1714 block_count = (la_tmp->la_blocks + 7) >> 3;
1717 mdd_data_get(env, mdd_obj, &data);
1718 /* get block quota for new owner */
1719 lquota_chkquota(mds_quota_interface_ref, obd,
1720 exp, qnids, block_pending,
1722 LQUOTA_FLAGS_BLK, data, 1);
1728 if (la_copy->la_valid & LA_FLAGS) {
1729 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1732 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1733 } else if (la_copy->la_valid) { /* setattr */
1734 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1736 /* journal chown/chgrp in llog, just like unlink */
1737 if (rc == 0 && lmm_size){
1738 cookie_size = mdd_lov_cookiesize(env, mdd);
1739 logcookies = mdd_max_cookie_get(env, mdd);
1740 if (logcookies == NULL)
1741 GOTO(cleanup, rc = -ENOMEM);
1743 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1744 logcookies, cookie_size) <= 0)
1749 if (rc == 0 && ma->ma_valid & MA_LOV) {
1752 mode = mdd_object_type(mdd_obj);
1753 if (S_ISREG(mode) || S_ISDIR(mode)) {
1754 rc = mdd_lsm_sanity_check(env, mdd_obj);
1758 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1759 ma->ma_lmm_size, handle, 1);
1763 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1766 mode = mdd_object_type(mdd_obj);
1768 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1773 rc = mdd_attr_set_changelog(env, obj, handle,
1774 ma->ma_attr.la_valid);
1776 mdd_trans_stop(env, mdd, rc, handle);
1777 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1778 /*set obd attr, if needed*/
1779 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1782 #ifdef HAVE_QUOTA_SUPPORT
1784 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1786 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1788 /* Trigger dqrel/dqacq for original owner and new owner.
1789 * If failed, the next call for lquota_chkquota will
1791 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1798 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1799 const struct lu_buf *buf, const char *name, int fl,
1800 struct thandle *handle)
1805 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1806 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1807 mdd_write_unlock(env, obj);
1812 static int mdd_xattr_sanity_check(const struct lu_env *env,
1813 struct mdd_object *obj)
1815 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1816 struct md_ucred *uc = md_ucred(env);
1820 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1823 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1827 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1828 !mdd_capable(uc, CFS_CAP_FOWNER))
1834 static int mdd_declare_xattr_set(const struct lu_env *env,
1835 struct mdd_device *mdd,
1836 struct mdd_object *obj,
1837 const struct lu_buf *buf,
1839 struct thandle *handle)
1844 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1848 /* Only record user xattr changes */
1849 if ((strncmp("user.", name, 5) == 0))
1850 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1856 * The caller should guarantee to update the object ctime
1857 * after xattr_set if needed.
1859 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1860 const struct lu_buf *buf, const char *name,
1863 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1864 struct mdd_device *mdd = mdo2mdd(obj);
1865 struct thandle *handle;
1869 rc = mdd_xattr_sanity_check(env, mdd_obj);
1873 handle = mdd_trans_create(env, mdd);
1875 RETURN(PTR_ERR(handle));
1877 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1881 rc = mdd_trans_start(env, mdd, handle);
1885 /* security-replated changes may require sync */
1886 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1887 handle->th_sync |= !!mdd->mdd_sync_permission;
1889 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1891 /* Only record system & user xattr changes */
1892 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1893 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1894 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1895 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1896 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1897 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1898 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1902 mdd_trans_stop(env, mdd, rc, handle);
1907 static int mdd_declare_xattr_del(const struct lu_env *env,
1908 struct mdd_device *mdd,
1909 struct mdd_object *obj,
1911 struct thandle *handle)
1915 rc = mdo_declare_xattr_del(env, obj, name, handle);
1919 /* Only record user xattr changes */
1920 if ((strncmp("user.", name, 5) == 0))
1921 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1927 * The caller should guarantee to update the object ctime
1928 * after xattr_set if needed.
1930 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1933 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1934 struct mdd_device *mdd = mdo2mdd(obj);
1935 struct thandle *handle;
1939 rc = mdd_xattr_sanity_check(env, mdd_obj);
1943 handle = mdd_trans_create(env, mdd);
1945 RETURN(PTR_ERR(handle));
1947 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1951 rc = mdd_trans_start(env, mdd, handle);
1955 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1956 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1957 mdd_object_capa(env, mdd_obj));
1958 mdd_write_unlock(env, mdd_obj);
1960 /* Only record system & user xattr changes */
1961 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1962 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1963 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1964 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1965 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1966 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1967 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1971 mdd_trans_stop(env, mdd, rc, handle);
1976 /* partial unlink */
1977 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1980 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1981 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1982 struct mdd_device *mdd = mdo2mdd(obj);
1983 struct thandle *handle;
1984 #ifdef HAVE_QUOTA_SUPPORT
1985 struct obd_device *obd = mdd->mdd_obd_dev;
1986 struct mds_obd *mds = &obd->u.mds;
1987 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1993 /* XXX: this code won't be used ever:
1994 * DNE uses slightly different approach */
1998 * Check -ENOENT early here because we need to get object type
1999 * to calculate credits before transaction start
2001 if (mdd_object_exists(mdd_obj) == 0) {
2002 CERROR("%s: object "DFID" not found: rc = -2\n",
2003 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2007 LASSERT(mdd_object_exists(mdd_obj) > 0);
2009 handle = mdd_trans_create(env, mdd);
2013 rc = mdd_trans_start(env, mdd, handle);
2015 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2017 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2021 mdo_ref_del(env, mdd_obj, handle);
2023 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2025 mdo_ref_del(env, mdd_obj, handle);
2028 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2029 la_copy->la_ctime = ma->ma_attr.la_ctime;
2031 la_copy->la_valid = LA_CTIME;
2032 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2036 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2037 #ifdef HAVE_QUOTA_SUPPORT
2038 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2039 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2040 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2041 mdd_quota_wrapper(&ma->ma_attr, qids);
2048 mdd_write_unlock(env, mdd_obj);
2049 mdd_trans_stop(env, mdd, rc, handle);
2050 #ifdef HAVE_QUOTA_SUPPORT
2052 /* Trigger dqrel on the owner of child. If failed,
2053 * the next call for lquota_chkquota will process it */
2054 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2060 /* partial operation */
2061 static int mdd_oc_sanity_check(const struct lu_env *env,
2062 struct mdd_object *obj,
2068 switch (ma->ma_attr.la_mode & S_IFMT) {
2085 static int mdd_object_create(const struct lu_env *env,
2086 struct md_object *obj,
2087 const struct md_op_spec *spec,
2091 struct mdd_device *mdd = mdo2mdd(obj);
2092 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2093 const struct lu_fid *pfid = spec->u.sp_pfid;
2094 struct thandle *handle;
2095 #ifdef HAVE_QUOTA_SUPPORT
2096 struct obd_device *obd = mdd->mdd_obd_dev;
2097 struct obd_export *exp = md_quota(env)->mq_exp;
2098 struct mds_obd *mds = &obd->u.mds;
2099 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2100 int quota_opc = 0, block_count = 0;
2101 int inode_pending[MAXQUOTAS] = { 0, 0 };
2102 int block_pending[MAXQUOTAS] = { 0, 0 };
2107 /* XXX: this code won't be used ever:
2108 * DNE uses slightly different approach */
2111 #ifdef HAVE_QUOTA_SUPPORT
2112 if (mds->mds_quota) {
2113 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2114 mdd_quota_wrapper(&ma->ma_attr, qids);
2115 /* get file quota for child */
2116 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2117 qids, inode_pending, 1, NULL, 0,
2119 switch (ma->ma_attr.la_mode & S_IFMT) {
2128 /* get block quota for child */
2130 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2131 qids, block_pending, block_count,
2132 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2136 handle = mdd_trans_create(env, mdd);
2138 GOTO(out_pending, rc = PTR_ERR(handle));
2140 rc = mdd_trans_start(env, mdd, handle);
2142 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2143 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2147 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2151 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2152 /* If creating the slave object, set slave EA here. */
2153 int lmv_size = spec->u.sp_ea.eadatalen;
2154 struct lmv_stripe_md *lmv;
2156 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2157 LASSERT(lmv != NULL && lmv_size > 0);
2159 rc = __mdd_xattr_set(env, mdd_obj,
2160 mdd_buf_get_const(env, lmv, lmv_size),
2161 XATTR_NAME_LMV, 0, handle);
2165 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2168 #ifdef CONFIG_FS_POSIX_ACL
2169 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2170 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2172 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2173 buf->lb_len = spec->u.sp_ea.eadatalen;
2174 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2175 rc = __mdd_acl_init(env, mdd_obj, buf,
2176 &ma->ma_attr.la_mode,
2181 ma->ma_attr.la_valid |= LA_MODE;
2184 pfid = spec->u.sp_ea.fid;
2187 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2193 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2194 mdd_write_unlock(env, mdd_obj);
2196 mdd_trans_stop(env, mdd, rc, handle);
2198 #ifdef HAVE_QUOTA_SUPPORT
2200 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2202 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2204 /* Trigger dqacq on the owner of child. If failed,
2205 * the next call for lquota_chkquota will process it. */
2206 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2214 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2215 const struct md_attr *ma)
2217 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2218 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2219 struct mdd_device *mdd = mdo2mdd(obj);
2220 struct thandle *handle;
2224 /* XXX: this code won't be used ever:
2225 * DNE uses slightly different approach */
2228 handle = mdd_trans_create(env, mdd);
2232 rc = mdd_trans_start(env, mdd, handle);
2234 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2235 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2237 mdo_ref_add(env, mdd_obj, handle);
2238 mdd_write_unlock(env, mdd_obj);
2240 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2241 la_copy->la_ctime = ma->ma_attr.la_ctime;
2243 la_copy->la_valid = LA_CTIME;
2244 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2247 mdd_trans_stop(env, mdd, 0, handle);
2253 * do NOT or the MAY_*'s, you'll get the weakest
2255 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2259 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2260 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2261 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2262 * owner can write to a file even if it is marked readonly to hide
2263 * its brokenness. (bug 5781) */
2264 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2265 struct md_ucred *uc = md_ucred(env);
2267 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2268 (la->la_uid == uc->mu_fsuid))
2272 if (flags & FMODE_READ)
2274 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2276 if (flags & MDS_FMODE_EXEC)
2281 static int mdd_open_sanity_check(const struct lu_env *env,
2282 struct mdd_object *obj, int flag)
2284 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2289 if (mdd_is_dead_obj(obj))
2292 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2296 if (S_ISLNK(tmp_la->la_mode))
2299 mode = accmode(env, tmp_la, flag);
2301 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2304 if (!(flag & MDS_OPEN_CREATED)) {
2305 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2310 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2311 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2312 flag &= ~MDS_OPEN_TRUNC;
2314 /* For writing append-only file must open it with append mode. */
2315 if (mdd_is_append(obj)) {
2316 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2318 if (flag & MDS_OPEN_TRUNC)
2324 * Now, flag -- O_NOATIME does not be packed by client.
2326 if (flag & O_NOATIME) {
2327 struct md_ucred *uc = md_ucred(env);
2329 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2330 (uc->mu_valid == UCRED_NEW)) &&
2331 (uc->mu_fsuid != tmp_la->la_uid) &&
2332 !mdd_capable(uc, CFS_CAP_FOWNER))
2340 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2343 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2346 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2348 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2350 mdd_obj->mod_count++;
2352 mdd_write_unlock(env, mdd_obj);
2356 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2357 struct md_attr *ma, struct thandle *handle)
2361 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2365 return mdo_declare_destroy(env, obj, handle);
2368 /* return md_attr back,
2369 * if it is last unlink then return lov ea + llog cookie*/
2370 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2371 struct md_attr *ma, struct thandle *handle)
2376 if (S_ISREG(mdd_object_type(obj))) {
2377 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2378 * Caller must be ready for that. */
2379 rc = __mdd_lmm_get(env, obj, ma);
2380 if ((ma->ma_valid & MA_LOV))
2381 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2386 rc = mdo_destroy(env, obj, handle);
2391 static int mdd_declare_close(const struct lu_env *env,
2392 struct mdd_object *obj,
2394 struct thandle *handle)
2398 rc = orph_declare_index_delete(env, obj, handle);
2402 return mdd_declare_object_kill(env, obj, ma, handle);
2406 * No permission check is needed.
2408 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2409 struct md_attr *ma, int mode)
2411 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2412 struct mdd_device *mdd = mdo2mdd(obj);
2413 struct thandle *handle = NULL;
2415 int is_orphan = 0, reset = 1;
2417 #ifdef HAVE_QUOTA_SUPPORT
2418 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2419 struct mds_obd *mds = &obd->u.mds;
2420 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2425 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2426 mdd_obj->mod_count--;
2428 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2429 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2430 "list\n", PFID(mdd_object_fid(mdd_obj)));
2434 /* check without any lock */
2435 if (mdd_obj->mod_count == 1 &&
2436 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2438 handle = mdd_trans_create(env, mdo2mdd(obj));
2440 RETURN(PTR_ERR(handle));
2442 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2446 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2450 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2455 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2456 if (handle == NULL && mdd_obj->mod_count == 1 &&
2457 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2458 mdd_write_unlock(env, mdd_obj);
2462 /* release open count */
2463 mdd_obj->mod_count --;
2465 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2466 /* remove link to object from orphan index */
2467 LASSERT(handle != NULL);
2468 rc = __mdd_orphan_del(env, mdd_obj, handle);
2470 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2471 "list, OSS objects to be destroyed.\n",
2472 PFID(mdd_object_fid(mdd_obj)));
2475 CERROR("Object "DFID" can not be deleted from orphan "
2476 "list, maybe cause OST objects can not be "
2477 "destroyed (err: %d).\n",
2478 PFID(mdd_object_fid(mdd_obj)), rc);
2479 /* If object was not deleted from orphan list, do not
2480 * destroy OSS objects, which will be done when next
2486 rc = mdd_iattr_get(env, mdd_obj, ma);
2487 /* Object maybe not in orphan list originally, it is rare case for
2488 * mdd_finish_unlink() failure. */
2489 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2490 #ifdef HAVE_QUOTA_SUPPORT
2491 if (mds->mds_quota) {
2492 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2493 mdd_quota_wrapper(&ma->ma_attr, qids);
2496 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2497 if (ma->ma_valid & MA_FLAGS &&
2498 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2499 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2501 if (handle == NULL) {
2502 handle = mdd_trans_create(env, mdo2mdd(obj));
2504 GOTO(out, rc = PTR_ERR(handle));
2506 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2511 rc = mdd_declare_changelog_store(env, mdd,
2516 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2521 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2527 CERROR("Error when prepare to delete Object "DFID" , "
2528 "which will cause OST objects can not be "
2529 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2535 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2537 mdd_write_unlock(env, mdd_obj);
2540 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2541 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2542 if (handle == NULL) {
2543 handle = mdd_trans_create(env, mdo2mdd(obj));
2545 GOTO(stop, rc = IS_ERR(handle));
2547 rc = mdd_declare_changelog_store(env, mdd, NULL,
2552 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2557 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2563 mdd_trans_stop(env, mdd, rc, handle);
2564 #ifdef HAVE_QUOTA_SUPPORT
2566 /* Trigger dqrel on the owner of child. If failed,
2567 * the next call for lquota_chkquota will process it */
2568 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2575 * Permission check is done when open,
2576 * no need check again.
2578 static int mdd_readpage_sanity_check(const struct lu_env *env,
2579 struct mdd_object *obj)
2581 struct dt_object *next = mdd_object_child(obj);
2585 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2593 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2594 struct lu_dirpage *dp, int nob,
2595 const struct dt_it_ops *iops, struct dt_it *it,
2601 struct lu_dirent *ent;
2602 struct lu_dirent *last = NULL;
2605 memset(area, 0, sizeof (*dp));
2606 area += sizeof (*dp);
2607 nob -= sizeof (*dp);
2614 len = iops->key_size(env, it);
2616 /* IAM iterator can return record with zero len. */
2620 hash = iops->store(env, it);
2621 if (unlikely(first)) {
2623 dp->ldp_hash_start = cpu_to_le64(hash);
2626 /* calculate max space required for lu_dirent */
2627 recsize = lu_dirent_calc_size(len, attr);
2629 if (nob >= recsize) {
2630 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2631 if (result == -ESTALE)
2636 /* osd might not able to pack all attributes,
2637 * so recheck rec length */
2638 recsize = le16_to_cpu(ent->lde_reclen);
2640 result = (last != NULL) ? 0 :-EINVAL;
2644 ent = (void *)ent + recsize;
2648 result = iops->next(env, it);
2649 if (result == -ESTALE)
2651 } while (result == 0);
2654 dp->ldp_hash_end = cpu_to_le64(hash);
2656 if (last->lde_hash == dp->ldp_hash_end)
2657 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2658 last->lde_reclen = 0; /* end mark */
2663 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2664 const struct lu_rdpg *rdpg)
2667 struct dt_object *next = mdd_object_child(obj);
2668 const struct dt_it_ops *iops;
2670 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2676 LASSERT(rdpg->rp_pages != NULL);
2677 LASSERT(next->do_index_ops != NULL);
2679 if (rdpg->rp_count <= 0)
2683 * iterate through directory and fill pages from @rdpg
2685 iops = &next->do_index_ops->dio_it;
2686 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2690 rc = iops->load(env, it, rdpg->rp_hash);
2694 * Iterator didn't find record with exactly the key requested.
2696 * It is currently either
2698 * - positioned above record with key less than
2699 * requested---skip it.
2701 * - or not positioned at all (is in IAM_IT_SKEWED
2702 * state)---position it on the next item.
2704 rc = iops->next(env, it);
2709 * At this point and across for-loop:
2711 * rc == 0 -> ok, proceed.
2712 * rc > 0 -> end of directory.
2715 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2716 i++, nob -= CFS_PAGE_SIZE) {
2717 struct lu_dirpage *dp;
2719 LASSERT(i < rdpg->rp_npages);
2720 pg = rdpg->rp_pages[i];
2722 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2725 rc = mdd_dir_page_build(env, mdd, dp,
2726 min_t(int, nob, LU_PAGE_SIZE),
2727 iops, it, rdpg->rp_attrs);
2732 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2734 } else if (rc < 0) {
2735 CWARN("build page failed: %d!\n", rc);
2738 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2739 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2740 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2747 struct lu_dirpage *dp;
2749 dp = cfs_kmap(rdpg->rp_pages[0]);
2750 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2753 * No pages were processed, mark this for first page
2756 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2759 cfs_kunmap(rdpg->rp_pages[0]);
2761 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2764 iops->fini(env, it);
2769 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2770 const struct lu_rdpg *rdpg)
2772 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2776 if (mdd_object_exists(mdd_obj) == 0) {
2777 CERROR("%s: object "DFID" not found: rc = -2\n",
2778 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2782 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2783 rc = mdd_readpage_sanity_check(env, mdd_obj);
2785 GOTO(out_unlock, rc);
2787 if (mdd_is_dead_obj(mdd_obj)) {
2789 struct lu_dirpage *dp;
2792 * According to POSIX, please do not return any entry to client:
2793 * even dot and dotdot should not be returned.
2795 CWARN("readdir from dead object: "DFID"\n",
2796 PFID(mdd_object_fid(mdd_obj)));
2798 if (rdpg->rp_count <= 0)
2799 GOTO(out_unlock, rc = -EFAULT);
2800 LASSERT(rdpg->rp_pages != NULL);
2802 pg = rdpg->rp_pages[0];
2803 dp = (struct lu_dirpage*)cfs_kmap(pg);
2804 memset(dp, 0 , sizeof(struct lu_dirpage));
2805 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2806 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2807 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2809 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2812 rc = __mdd_readpage(env, mdd_obj, rdpg);
2816 mdd_read_unlock(env, mdd_obj);
2820 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2822 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2824 if (mdd_object_exists(mdd_obj) == 0) {
2825 CERROR("%s: object "DFID" not found: rc = -2\n",
2826 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2829 return dt_object_sync(env, mdd_object_child(mdd_obj));
2832 const struct md_object_operations mdd_obj_ops = {
2833 .moo_permission = mdd_permission,
2834 .moo_attr_get = mdd_attr_get,
2835 .moo_attr_set = mdd_attr_set,
2836 .moo_xattr_get = mdd_xattr_get,
2837 .moo_xattr_set = mdd_xattr_set,
2838 .moo_xattr_list = mdd_xattr_list,
2839 .moo_xattr_del = mdd_xattr_del,
2840 .moo_object_create = mdd_object_create,
2841 .moo_ref_add = mdd_ref_add,
2842 .moo_ref_del = mdd_ref_del,
2843 .moo_open = mdd_open,
2844 .moo_close = mdd_close,
2845 .moo_readpage = mdd_readpage,
2846 .moo_readlink = mdd_readlink,
2847 .moo_changelog = mdd_changelog,
2848 .moo_capa_get = mdd_capa_get,
2849 .moo_object_sync = mdd_object_sync,
2850 .moo_path = mdd_path,
2851 .moo_file_lock = mdd_file_lock,
2852 .moo_file_unlock = mdd_file_unlock,