4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
58 #include "mdd_internal.h"
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
63 static int mdd_xattr_get(const struct lu_env *env,
64 struct md_object *obj, struct lu_buf *buf,
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
70 if (mdd_object_exists(obj) == 0) {
71 CERROR("%s: object "DFID" not found: rc = -2\n",
72 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
75 mdo_data_get(env, obj, data);
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80 struct lu_attr *la, struct lustre_capa *capa)
82 if (mdd_object_exists(obj) == 0) {
83 CERROR("%s: object "DFID" not found: rc = -2\n",
84 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
87 return mdo_attr_get(env, obj, la, capa);
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
92 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
94 if (flags & LUSTRE_APPEND_FL)
95 obj->mod_flags |= APPEND_OBJ;
97 if (flags & LUSTRE_IMMUTABLE_FL)
98 obj->mod_flags |= IMMUTE_OBJ;
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
103 struct mdd_thread_info *info;
105 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106 LASSERT(info != NULL);
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
114 buf = &mdd_env_info(env)->mti_buf;
120 void mdd_buf_put(struct lu_buf *buf)
122 if (buf == NULL || buf->lb_buf == NULL)
124 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130 const void *area, ssize_t len)
134 buf = &mdd_env_info(env)->mti_buf;
135 buf->lb_buf = (void *)area;
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
142 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
144 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
148 if (buf->lb_buf == NULL) {
150 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151 if (buf->lb_buf == NULL)
157 /** Increase the size of the \a mti_big_buf.
158 * preserves old data in buffer
159 * old buffer remains unchanged on error
160 * \retval 0 or -ENOMEM
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
164 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
167 LASSERT(len >= oldbuf->lb_len);
168 OBD_ALLOC_LARGE(buf.lb_buf, len);
170 if (buf.lb_buf == NULL)
174 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
176 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
178 memcpy(oldbuf, &buf, sizeof(buf));
183 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
184 struct mdd_device *mdd)
186 struct mdd_thread_info *mti = mdd_env_info(env);
189 max_cookie_size = mdd_lov_cookiesize(env, mdd);
190 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
191 if (mti->mti_max_cookie)
192 OBD_FREE_LARGE(mti->mti_max_cookie,
193 mti->mti_max_cookie_size);
194 mti->mti_max_cookie = NULL;
195 mti->mti_max_cookie_size = 0;
197 if (unlikely(mti->mti_max_cookie == NULL)) {
198 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
199 if (likely(mti->mti_max_cookie != NULL))
200 mti->mti_max_cookie_size = max_cookie_size;
202 if (likely(mti->mti_max_cookie != NULL))
203 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
204 return mti->mti_max_cookie;
207 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
209 struct mdd_thread_info *mti = mdd_env_info(env);
211 if (unlikely(mti->mti_max_lmm_size < size)) {
212 int rsize = size_roundup_power2(size);
214 if (mti->mti_max_lmm_size > 0) {
215 LASSERT(mti->mti_max_lmm);
216 OBD_FREE_LARGE(mti->mti_max_lmm,
217 mti->mti_max_lmm_size);
218 mti->mti_max_lmm = NULL;
219 mti->mti_max_lmm_size = 0;
222 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
223 if (likely(mti->mti_max_lmm != NULL))
224 mti->mti_max_lmm_size = rsize;
226 return mti->mti_max_lmm;
229 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
230 struct mdd_device *mdd)
234 max_lmm_size = mdd_lov_mdsize(env, mdd);
235 return mdd_max_lmm_buffer(env, max_lmm_size);
238 struct lu_object *mdd_object_alloc(const struct lu_env *env,
239 const struct lu_object_header *hdr,
242 struct mdd_object *mdd_obj;
244 OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
245 if (mdd_obj != NULL) {
248 o = mdd2lu_obj(mdd_obj);
249 lu_object_init(o, NULL, d);
250 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
251 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
252 mdd_obj->mod_count = 0;
253 o->lo_ops = &mdd_lu_obj_ops;
260 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
261 const struct lu_object_conf *unused)
263 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
264 struct mdd_object *mdd_obj = lu2mdd_obj(o);
265 struct lu_object *below;
266 struct lu_device *under;
269 mdd_obj->mod_cltime = 0;
270 under = &d->mdd_child->dd_lu_dev;
271 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
272 mdd_pdlock_init(mdd_obj);
276 lu_object_add(o, below);
281 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
283 if (lu_object_exists(o))
284 return mdd_get_flags(env, lu2mdd_obj(o));
289 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
291 struct mdd_object *mdd = lu2mdd_obj(o);
294 OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
297 static int mdd_object_print(const struct lu_env *env, void *cookie,
298 lu_printer_t p, const struct lu_object *o)
300 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
301 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
302 "valid=%x, cltime="LPU64", flags=%lx)",
303 mdd, mdd->mod_count, mdd->mod_valid,
304 mdd->mod_cltime, mdd->mod_flags);
307 static const struct lu_object_operations mdd_lu_obj_ops = {
308 .loo_object_init = mdd_object_init,
309 .loo_object_start = mdd_object_start,
310 .loo_object_free = mdd_object_free,
311 .loo_object_print = mdd_object_print,
314 struct mdd_object *mdd_object_find(const struct lu_env *env,
315 struct mdd_device *d,
316 const struct lu_fid *f)
318 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
321 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
322 const char *path, struct lu_fid *fid)
325 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
326 struct mdd_object *obj;
327 struct lu_name *lname = &mdd_env_info(env)->mti_name;
332 /* temp buffer for path element */
333 buf = mdd_buf_alloc(env, PATH_MAX);
334 if (buf->lb_buf == NULL)
337 lname->ln_name = name = buf->lb_buf;
338 lname->ln_namelen = 0;
339 *f = mdd->mdd_root_fid;
346 while (*path != '/' && *path != '\0') {
354 /* find obj corresponding to fid */
355 obj = mdd_object_find(env, mdd, f);
357 GOTO(out, rc = -EREMOTE);
359 GOTO(out, rc = PTR_ERR(obj));
360 /* get child fid from parent and name */
361 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
362 mdd_object_put(env, obj);
367 lname->ln_namelen = 0;
376 /** The maximum depth that fid2path() will search.
377 * This is limited only because we want to store the fids for
378 * historical path lookup purposes.
380 #define MAX_PATH_DEPTH 100
382 /** mdd_path() lookup structure. */
383 struct path_lookup_info {
384 __u64 pli_recno; /**< history point */
385 __u64 pli_currec; /**< current record */
386 struct lu_fid pli_fid;
387 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
388 struct mdd_object *pli_mdd_obj;
389 char *pli_path; /**< full path */
391 int pli_linkno; /**< which hardlink to follow */
392 int pli_fidcount; /**< number of \a pli_fids */
395 static int mdd_path_current(const struct lu_env *env,
396 struct path_lookup_info *pli)
398 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
399 struct mdd_object *mdd_obj;
400 struct lu_buf *buf = NULL;
401 struct link_ea_header *leh;
402 struct link_ea_entry *lee;
403 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
404 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
410 ptr = pli->pli_path + pli->pli_pathlen - 1;
413 pli->pli_fidcount = 0;
414 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
416 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
417 mdd_obj = mdd_object_find(env, mdd,
418 &pli->pli_fids[pli->pli_fidcount]);
420 GOTO(out, rc = -EREMOTE);
422 GOTO(out, rc = PTR_ERR(mdd_obj));
423 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
425 mdd_object_put(env, mdd_obj);
429 /* Do I need to error out here? */
434 /* Get parent fid and object name */
435 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
436 buf = mdd_links_get(env, mdd_obj);
437 mdd_read_unlock(env, mdd_obj);
438 mdd_object_put(env, mdd_obj);
440 GOTO(out, rc = PTR_ERR(buf));
443 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
444 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
446 /* If set, use link #linkno for path lookup, otherwise use
447 link #0. Only do this for the final path element. */
448 if ((pli->pli_fidcount == 0) &&
449 (pli->pli_linkno < leh->leh_reccount)) {
451 for (count = 0; count < pli->pli_linkno; count++) {
452 lee = (struct link_ea_entry *)
453 ((char *)lee + reclen);
454 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
456 if (pli->pli_linkno < leh->leh_reccount - 1)
457 /* indicate to user there are more links */
461 /* Pack the name in the end of the buffer */
462 ptr -= tmpname->ln_namelen;
463 if (ptr - 1 <= pli->pli_path)
464 GOTO(out, rc = -EOVERFLOW);
465 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
468 /* Store the parent fid for historic lookup */
469 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
470 GOTO(out, rc = -EOVERFLOW);
471 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
474 /* Verify that our path hasn't changed since we started the lookup.
475 Record the current index, and verify the path resolves to the
476 same fid. If it does, then the path is correct as of this index. */
477 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
478 pli->pli_currec = mdd->mdd_cl.mc_index;
479 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
480 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
482 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
483 GOTO (out, rc = -EAGAIN);
485 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
486 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
487 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
488 PFID(&pli->pli_fid));
489 GOTO(out, rc = -EAGAIN);
491 ptr++; /* skip leading / */
492 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
496 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
497 /* if we vmalloced a large buffer drop it */
503 static int mdd_path_historic(const struct lu_env *env,
504 struct path_lookup_info *pli)
509 /* Returns the full path to this fid, as of changelog record recno. */
510 static int mdd_path(const struct lu_env *env, struct md_object *obj,
511 char *path, int pathlen, __u64 *recno, int *linkno)
513 struct path_lookup_info *pli;
521 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
530 pli->pli_mdd_obj = md2mdd_obj(obj);
531 pli->pli_recno = *recno;
532 pli->pli_path = path;
533 pli->pli_pathlen = pathlen;
534 pli->pli_linkno = *linkno;
536 /* Retry multiple times in case file is being moved */
537 while (tries-- && rc == -EAGAIN)
538 rc = mdd_path_current(env, pli);
540 /* For historical path lookup, the current links may not have existed
541 * at "recno" time. We must switch over to earlier links/parents
542 * by using the changelog records. If the earlier parent doesn't
543 * exist, we must search back through the changelog to reconstruct
544 * its parents, then check if it exists, etc.
545 * We may ignore this problem for the initial implementation and
546 * state that an "original" hardlink must still exist for us to find
547 * historic path name. */
548 if (pli->pli_recno != -1) {
549 rc = mdd_path_historic(env, pli);
551 *recno = pli->pli_currec;
552 /* Return next link index to caller */
553 *linkno = pli->pli_linkno;
561 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
563 struct lu_attr *la = &mdd_env_info(env)->mti_la;
567 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
569 mdd_flags_xlate(obj, la->la_flags);
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
581 if (ma->ma_valid & MA_INODE)
584 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585 mdd_object_capa(env, mdd_obj));
587 ma->ma_valid |= MA_INODE;
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
593 struct lov_desc *ldesc;
594 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595 struct lov_user_md *lum = (struct lov_user_md*)lmm;
601 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602 LASSERT(ldesc != NULL);
604 lum->lmm_magic = LOV_MAGIC_V1;
605 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606 lum->lmm_pattern = ldesc->ld_pattern;
607 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
611 RETURN(sizeof(*lum));
614 static int is_rootdir(struct mdd_object *mdd_obj)
616 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617 const struct lu_fid *fid = mdo2fid(mdd_obj);
619 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
622 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
625 struct mdd_thread_info *info = mdd_env_info(env);
630 LASSERT(info != NULL);
631 LASSERT(ma->ma_big_lmm_used == 0);
633 if (ma->ma_lmm_size == 0) {
634 CERROR("No buffer to hold %s xattr of object "DFID"\n",
635 XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
639 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
640 mdd_object_capa(env, obj));
644 /* big_lmm may need to grow */
646 mdd_max_lmm_buffer(env, size);
647 if (info->mti_max_lmm == NULL)
650 LASSERT(info->mti_max_lmm_size >= size);
651 rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
656 ma->ma_big_lmm_used = 1;
657 ma->ma_valid |= MA_LOV;
658 ma->ma_lmm = info->mti_max_lmm;
659 ma->ma_lmm_size = size;
664 /* get lov EA only */
665 static int __mdd_lmm_get(const struct lu_env *env,
666 struct mdd_object *mdd_obj, struct md_attr *ma)
671 if (ma->ma_valid & MA_LOV)
674 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
677 rc = mdd_big_lmm_get(env, mdd_obj, ma);
678 else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
679 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
682 ma->ma_lmm_size = rc;
683 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
684 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
690 /* get the first parent fid from link EA */
691 static int mdd_pfid_get(const struct lu_env *env,
692 struct mdd_object *mdd_obj, struct md_attr *ma)
695 struct link_ea_header *leh;
696 struct link_ea_entry *lee;
697 struct lu_fid *pfid = &ma->ma_pfid;
700 if (ma->ma_valid & MA_PFID)
703 buf = mdd_links_get(env, mdd_obj);
705 RETURN(PTR_ERR(buf));
708 lee = (struct link_ea_entry *)(leh + 1);
709 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
710 fid_be_to_cpu(pfid, pfid);
711 ma->ma_valid |= MA_PFID;
712 if (buf->lb_len > OBD_ALLOC_BIG)
713 /* if we vmalloced a large buffer drop it */
718 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
724 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
725 rc = __mdd_lmm_get(env, mdd_obj, ma);
726 mdd_read_unlock(env, mdd_obj);
731 static int __mdd_lmv_get(const struct lu_env *env,
732 struct mdd_object *mdd_obj, struct md_attr *ma)
737 if (ma->ma_valid & MA_LMV)
740 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
743 ma->ma_valid |= MA_LMV;
749 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
752 struct mdd_thread_info *info = mdd_env_info(env);
753 struct lustre_mdt_attrs *lma =
754 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
759 /* If all needed data are already valid, nothing to do */
760 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
761 (ma->ma_need & (MA_HSM | MA_SOM)))
764 /* Read LMA from disk EA */
765 lma_size = sizeof(info->mti_xattr_buf);
766 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
770 /* Useless to check LMA incompatibility because this is already done in
771 * osd_ea_fid_get(), and this will fail long before this code is
773 * So, if we are here, LMA is compatible.
776 lustre_lma_swab(lma);
778 /* Swab and copy LMA */
779 if (ma->ma_need & MA_HSM) {
780 if (lma->lma_compat & LMAC_HSM)
781 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
783 ma->ma_hsm.mh_flags = 0;
784 ma->ma_valid |= MA_HSM;
788 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
789 LASSERT(ma->ma_som != NULL);
790 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
791 ma->ma_som->msd_size = lma->lma_som_size;
792 ma->ma_som->msd_blocks = lma->lma_som_blocks;
793 ma->ma_som->msd_mountid = lma->lma_som_mountid;
794 ma->ma_valid |= MA_SOM;
800 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
806 if (ma->ma_need & MA_INODE)
807 rc = mdd_iattr_get(env, mdd_obj, ma);
809 if (rc == 0 && ma->ma_need & MA_LOV) {
810 if (S_ISREG(mdd_object_type(mdd_obj)) ||
811 S_ISDIR(mdd_object_type(mdd_obj)))
812 rc = __mdd_lmm_get(env, mdd_obj, ma);
814 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
815 if (S_ISREG(mdd_object_type(mdd_obj)))
816 rc = mdd_pfid_get(env, mdd_obj, ma);
818 if (rc == 0 && ma->ma_need & MA_LMV) {
819 if (S_ISDIR(mdd_object_type(mdd_obj)))
820 rc = __mdd_lmv_get(env, mdd_obj, ma);
822 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
823 if (S_ISREG(mdd_object_type(mdd_obj)))
824 rc = __mdd_lma_get(env, mdd_obj, ma);
826 #ifdef CONFIG_FS_POSIX_ACL
827 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
828 if (S_ISDIR(mdd_object_type(mdd_obj)))
829 rc = mdd_def_acl_get(env, mdd_obj, ma);
832 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
833 rc, ma->ma_valid, ma->ma_lmm);
837 int mdd_attr_get_internal_locked(const struct lu_env *env,
838 struct mdd_object *mdd_obj, struct md_attr *ma)
841 int needlock = ma->ma_need &
842 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
845 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
846 rc = mdd_attr_get_internal(env, mdd_obj, ma);
848 mdd_read_unlock(env, mdd_obj);
853 * No permission check is needed.
855 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
858 struct mdd_object *mdd_obj = md2mdd_obj(obj);
862 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
867 * No permission check is needed.
869 static int mdd_xattr_get(const struct lu_env *env,
870 struct md_object *obj, struct lu_buf *buf,
873 struct mdd_object *mdd_obj = md2mdd_obj(obj);
878 if (mdd_object_exists(mdd_obj) == 0) {
879 CERROR("%s: object "DFID" not found: rc = -2\n",
880 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
884 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
885 rc = mdo_xattr_get(env, mdd_obj, buf, name,
886 mdd_object_capa(env, mdd_obj));
887 mdd_read_unlock(env, mdd_obj);
893 * Permission check is done when open,
894 * no need check again.
896 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
899 struct mdd_object *mdd_obj = md2mdd_obj(obj);
900 struct dt_object *next;
905 if (mdd_object_exists(mdd_obj) == 0) {
906 CERROR("%s: object "DFID" not found: rc = -2\n",
907 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
911 next = mdd_object_child(mdd_obj);
912 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
913 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
914 mdd_object_capa(env, mdd_obj));
915 mdd_read_unlock(env, mdd_obj);
920 * No permission check is needed.
922 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
925 struct mdd_object *mdd_obj = md2mdd_obj(obj);
930 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
931 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
932 mdd_read_unlock(env, mdd_obj);
937 int mdd_declare_object_create_internal(const struct lu_env *env,
938 struct mdd_object *p,
939 struct mdd_object *c,
941 struct thandle *handle,
942 const struct md_op_spec *spec)
944 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
945 const struct dt_index_features *feat = spec->sp_feat;
949 if (feat != &dt_directory_features && feat != NULL)
950 dof->dof_type = DFT_INDEX;
952 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
954 dof->u.dof_idx.di_feat = feat;
956 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
961 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
962 struct mdd_object *c, struct md_attr *ma,
963 struct thandle *handle,
964 const struct md_op_spec *spec)
966 struct lu_attr *attr = &ma->ma_attr;
967 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
968 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
969 const struct dt_index_features *feat = spec->sp_feat;
973 if (!mdd_object_exists(c)) {
974 struct dt_object *next = mdd_object_child(c);
977 if (feat != &dt_directory_features && feat != NULL)
978 dof->dof_type = DFT_INDEX;
980 dof->dof_type = dt_mode_to_dft(attr->la_mode);
982 dof->u.dof_idx.di_feat = feat;
984 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
985 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
993 * Make sure the ctime is increased only.
995 static inline int mdd_attr_check(const struct lu_env *env,
996 struct mdd_object *obj,
997 struct lu_attr *attr)
999 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1003 if (attr->la_valid & LA_CTIME) {
1004 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1008 if (attr->la_ctime < tmp_la->la_ctime)
1009 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1010 else if (attr->la_valid == LA_CTIME &&
1011 attr->la_ctime == tmp_la->la_ctime)
1012 attr->la_valid &= ~LA_CTIME;
1017 int mdd_attr_set_internal(const struct lu_env *env,
1018 struct mdd_object *obj,
1019 struct lu_attr *attr,
1020 struct thandle *handle,
1026 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1027 #ifdef CONFIG_FS_POSIX_ACL
1028 if (!rc && (attr->la_valid & LA_MODE) && needacl)
1029 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1034 int mdd_attr_check_set_internal(const struct lu_env *env,
1035 struct mdd_object *obj,
1036 struct lu_attr *attr,
1037 struct thandle *handle,
1043 rc = mdd_attr_check(env, obj, attr);
1048 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1052 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1053 struct mdd_object *obj,
1054 struct lu_attr *attr,
1055 struct thandle *handle,
1061 needacl = needacl && (attr->la_valid & LA_MODE);
1063 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1064 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1066 mdd_write_unlock(env, obj);
1070 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1071 struct mdd_object *obj,
1072 struct lu_attr *attr,
1073 struct thandle *handle,
1079 needacl = needacl && (attr->la_valid & LA_MODE);
1081 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1082 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1084 mdd_write_unlock(env, obj);
1088 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1089 const struct lu_buf *buf, const char *name,
1090 int fl, struct thandle *handle)
1092 struct lustre_capa *capa = mdd_object_capa(env, obj);
1096 if (buf->lb_buf && buf->lb_len > 0)
1097 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1098 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1099 rc = mdo_xattr_del(env, obj, name, handle, capa);
1105 * This gives the same functionality as the code between
1106 * sys_chmod and inode_setattr
1107 * chown_common and inode_setattr
1108 * utimes and inode_setattr
1109 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1111 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1112 struct lu_attr *la, const struct md_attr *ma)
1114 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1115 struct md_ucred *uc;
1122 /* Do not permit change file type */
1123 if (la->la_valid & LA_TYPE)
1126 /* They should not be processed by setattr */
1127 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1130 /* export destroy does not have ->le_ses, but we may want
1131 * to drop LUSTRE_SOM_FL. */
1137 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1141 if (la->la_valid == LA_CTIME) {
1142 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1143 /* This is only for set ctime when rename's source is
1145 rc = mdd_may_delete(env, NULL, obj,
1146 (struct md_attr *)ma, 1, 0);
1147 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1148 la->la_valid &= ~LA_CTIME;
1152 if (la->la_valid == LA_ATIME) {
1153 /* This is atime only set for read atime update on close. */
1154 if (la->la_atime >= tmp_la->la_atime &&
1155 la->la_atime < (tmp_la->la_atime +
1156 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1157 la->la_valid &= ~LA_ATIME;
1161 /* Check if flags change. */
1162 if (la->la_valid & LA_FLAGS) {
1163 unsigned int oldflags = 0;
1164 unsigned int newflags = la->la_flags &
1165 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1167 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1168 !mdd_capable(uc, CFS_CAP_FOWNER))
1171 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1172 * only be changed by the relevant capability. */
1173 if (mdd_is_immutable(obj))
1174 oldflags |= LUSTRE_IMMUTABLE_FL;
1175 if (mdd_is_append(obj))
1176 oldflags |= LUSTRE_APPEND_FL;
1177 if ((oldflags ^ newflags) &&
1178 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1181 if (!S_ISDIR(tmp_la->la_mode))
1182 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1185 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1186 (la->la_valid & ~LA_FLAGS) &&
1187 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1190 /* Check for setting the obj time. */
1191 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1192 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1193 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1194 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1195 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1203 if (la->la_valid & LA_KILL_SUID) {
1204 la->la_valid &= ~LA_KILL_SUID;
1205 if ((tmp_la->la_mode & S_ISUID) &&
1206 !(la->la_valid & LA_MODE)) {
1207 la->la_mode = tmp_la->la_mode;
1208 la->la_valid |= LA_MODE;
1210 la->la_mode &= ~S_ISUID;
1213 if (la->la_valid & LA_KILL_SGID) {
1214 la->la_valid &= ~LA_KILL_SGID;
1215 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1216 (S_ISGID | S_IXGRP)) &&
1217 !(la->la_valid & LA_MODE)) {
1218 la->la_mode = tmp_la->la_mode;
1219 la->la_valid |= LA_MODE;
1221 la->la_mode &= ~S_ISGID;
1224 /* Make sure a caller can chmod. */
1225 if (la->la_valid & LA_MODE) {
1226 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1227 (uc->mu_fsuid != tmp_la->la_uid) &&
1228 !mdd_capable(uc, CFS_CAP_FOWNER))
1231 if (la->la_mode == (cfs_umode_t) -1)
1232 la->la_mode = tmp_la->la_mode;
1234 la->la_mode = (la->la_mode & S_IALLUGO) |
1235 (tmp_la->la_mode & ~S_IALLUGO);
1237 /* Also check the setgid bit! */
1238 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1239 la->la_gid : tmp_la->la_gid) &&
1240 !mdd_capable(uc, CFS_CAP_FSETID))
1241 la->la_mode &= ~S_ISGID;
1243 la->la_mode = tmp_la->la_mode;
1246 /* Make sure a caller can chown. */
1247 if (la->la_valid & LA_UID) {
1248 if (la->la_uid == (uid_t) -1)
1249 la->la_uid = tmp_la->la_uid;
1250 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1251 (la->la_uid != tmp_la->la_uid)) &&
1252 !mdd_capable(uc, CFS_CAP_CHOWN))
1255 /* If the user or group of a non-directory has been
1256 * changed by a non-root user, remove the setuid bit.
1257 * 19981026 David C Niemi <niemi@tux.org>
1259 * Changed this to apply to all users, including root,
1260 * to avoid some races. This is the behavior we had in
1261 * 2.0. The check for non-root was definitely wrong
1262 * for 2.2 anyway, as it should have been using
1263 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1264 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1265 !S_ISDIR(tmp_la->la_mode)) {
1266 la->la_mode &= ~S_ISUID;
1267 la->la_valid |= LA_MODE;
1271 /* Make sure caller can chgrp. */
1272 if (la->la_valid & LA_GID) {
1273 if (la->la_gid == (gid_t) -1)
1274 la->la_gid = tmp_la->la_gid;
1275 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1276 ((la->la_gid != tmp_la->la_gid) &&
1277 !lustre_in_group_p(uc, la->la_gid))) &&
1278 !mdd_capable(uc, CFS_CAP_CHOWN))
1281 /* Likewise, if the user or group of a non-directory
1282 * has been changed by a non-root user, remove the
1283 * setgid bit UNLESS there is no group execute bit
1284 * (this would be a file marked for mandatory
1285 * locking). 19981026 David C Niemi <niemi@tux.org>
1287 * Removed the fsuid check (see the comment above) --
1289 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1290 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1291 la->la_mode &= ~S_ISGID;
1292 la->la_valid |= LA_MODE;
1296 /* For both Size-on-MDS case and truncate case,
1297 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1298 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1299 * For SOM case, it is true, the MAY_WRITE perm has been checked
1300 * when open, no need check again. For truncate case, it is false,
1301 * the MAY_WRITE perm should be checked here. */
1302 if (ma->ma_attr_flags & MDS_SOM) {
1303 /* For the "Size-on-MDS" setattr update, merge coming
1304 * attributes with the set in the inode. BUG 10641 */
1305 if ((la->la_valid & LA_ATIME) &&
1306 (la->la_atime <= tmp_la->la_atime))
1307 la->la_valid &= ~LA_ATIME;
1309 /* OST attributes do not have a priority over MDS attributes,
1310 * so drop times if ctime is equal. */
1311 if ((la->la_valid & LA_CTIME) &&
1312 (la->la_ctime <= tmp_la->la_ctime))
1313 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1315 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1316 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1317 (uc->mu_fsuid == tmp_la->la_uid)) &&
1318 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1319 rc = mdd_permission_internal_locked(env, obj,
1326 if (la->la_valid & LA_CTIME) {
1327 /* The pure setattr, it has the priority over what is
1328 * already set, do not drop it if ctime is equal. */
1329 if (la->la_ctime < tmp_la->la_ctime)
1330 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1338 /** Store a data change changelog record
1339 * If this fails, we must fail the whole transaction; we don't
1340 * want the change to commit without the log entry.
1341 * \param mdd_obj - mdd_object of change
1342 * \param handle - transacion handle
1344 static int mdd_changelog_data_store(const struct lu_env *env,
1345 struct mdd_device *mdd,
1346 enum changelog_rec_type type,
1348 struct mdd_object *mdd_obj,
1349 struct thandle *handle)
1351 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1352 struct llog_changelog_rec *rec;
1353 struct thandle *th = NULL;
1359 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1361 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1364 LASSERT(mdd_obj != NULL);
1365 LASSERT(handle != NULL);
1367 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1368 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1369 /* Don't need multiple updates in this log */
1370 /* Don't check under lock - no big deal if we get an extra
1375 reclen = llog_data_len(sizeof(*rec));
1376 buf = mdd_buf_alloc(env, reclen);
1377 if (buf->lb_buf == NULL)
1379 rec = (struct llog_changelog_rec *)buf->lb_buf;
1381 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1382 rec->cr.cr_type = (__u32)type;
1383 rec->cr.cr_tfid = *tfid;
1384 rec->cr.cr_namelen = 0;
1385 mdd_obj->mod_cltime = cfs_time_current_64();
1387 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1390 mdd_trans_stop(env, mdd, rc, th);
1393 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1394 rc, type, PFID(tfid));
1401 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1402 int flags, struct md_object *obj)
1404 struct thandle *handle;
1405 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1406 struct mdd_device *mdd = mdo2mdd(obj);
1410 handle = mdd_trans_create(env, mdd);
1412 return(PTR_ERR(handle));
1414 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1418 rc = mdd_trans_start(env, mdd, handle);
1422 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1426 mdd_trans_stop(env, mdd, rc, handle);
1432 * Should be called with write lock held.
1434 * \see mdd_lma_set_locked().
1436 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1437 const struct md_attr *ma, struct thandle *handle)
1439 struct mdd_thread_info *info = mdd_env_info(env);
1441 struct lustre_mdt_attrs *lma =
1442 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1443 int lmasize = sizeof(struct lustre_mdt_attrs);
1448 /* Either HSM or SOM part is not valid, we need to read it before */
1449 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1450 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1454 lustre_lma_swab(lma);
1456 memset(lma, 0, lmasize);
1460 if (ma->ma_valid & MA_HSM) {
1461 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1462 lma->lma_compat |= LMAC_HSM;
1466 if (ma->ma_valid & MA_SOM) {
1467 LASSERT(ma->ma_som != NULL);
1468 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1469 lma->lma_compat &= ~LMAC_SOM;
1471 lma->lma_compat |= LMAC_SOM;
1472 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1473 lma->lma_som_size = ma->ma_som->msd_size;
1474 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1475 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1480 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1482 lustre_lma_swab(lma);
1483 buf = mdd_buf_get(env, lma, lmasize);
1484 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1490 * Save LMA extended attributes with data from \a ma.
1492 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1493 * not, LMA EA will be first read from disk, modified and write back.
1496 static int mdd_lma_set_locked(const struct lu_env *env,
1497 struct mdd_object *mdd_obj,
1498 const struct md_attr *ma, struct thandle *handle)
1502 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1503 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1504 mdd_write_unlock(env, mdd_obj);
1508 /* Precedence for choosing record type when multiple
1509 * attributes change: setattr > mtime > ctime > atime
1510 * (ctime changes when mtime does, plus chmod/chown.
1511 * atime and ctime are independent.) */
1512 static int mdd_attr_set_changelog(const struct lu_env *env,
1513 struct md_object *obj, struct thandle *handle,
1516 struct mdd_device *mdd = mdo2mdd(obj);
1519 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1520 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1521 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1522 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1523 bits = bits & mdd->mdd_cl.mc_mask;
1527 /* The record type is the lowest non-masked set bit */
1528 while (bits && ((bits & 1) == 0)) {
1533 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1534 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1535 md2mdd_obj(obj), handle);
1538 static int mdd_declare_attr_set(const struct lu_env *env,
1539 struct mdd_device *mdd,
1540 struct mdd_object *obj,
1541 const struct md_attr *ma,
1542 struct lov_mds_md *lmm,
1543 struct thandle *handle)
1545 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1548 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1552 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1556 if (ma->ma_valid & MA_LOV) {
1558 buf->lb_len = ma->ma_lmm_size;
1559 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1565 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1567 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1568 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1574 #ifdef CONFIG_FS_POSIX_ACL
1575 if (ma->ma_attr.la_valid & LA_MODE) {
1576 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1577 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,XATTR_NAME_ACL_ACCESS,
1579 mdd_read_unlock(env, obj);
1580 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1588 rc = mdo_declare_xattr_set(env, obj, buf,
1589 XATTR_NAME_ACL_ACCESS, 0,
1597 /* basically the log is the same as in unlink case */
1601 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1602 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1603 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1604 mdd->mdd_obd_dev->obd_name,
1605 le32_to_cpu(lmm->lmm_magic),
1606 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1610 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1611 if (stripe == LOV_ALL_STRIPES) {
1612 struct lov_desc *ldesc;
1614 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1615 LASSERT(ldesc != NULL);
1616 stripe = ldesc->ld_tgt_count;
1619 for (i = 0; i < stripe; i++) {
1620 rc = mdd_declare_llog_record(env, mdd,
1621 sizeof(struct llog_unlink_rec),
1631 /* set attr and LOV EA at once, return updated attr */
1632 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1633 const struct md_attr *ma)
1635 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1636 struct mdd_device *mdd = mdo2mdd(obj);
1637 struct thandle *handle;
1638 struct lov_mds_md *lmm = NULL;
1639 struct llog_cookie *logcookies = NULL;
1640 int rc, lmm_size = 0, cookie_size = 0;
1641 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1642 #ifdef HAVE_QUOTA_SUPPORT
1643 struct obd_device *obd = mdd->mdd_obd_dev;
1644 struct mds_obd *mds = &obd->u.mds;
1645 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1646 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1647 int quota_opc = 0, block_count = 0;
1648 int inode_pending[MAXQUOTAS] = { 0, 0 };
1649 int block_pending[MAXQUOTAS] = { 0, 0 };
1653 *la_copy = ma->ma_attr;
1654 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1658 /* setattr on "close" only change atime, or do nothing */
1659 if (ma->ma_valid == MA_INODE &&
1660 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1663 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1664 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1665 lmm_size = mdd_lov_mdsize(env, mdd);
1666 lmm = mdd_max_lmm_get(env, mdd);
1670 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1677 handle = mdd_trans_create(env, mdd);
1679 RETURN(PTR_ERR(handle));
1681 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1682 lmm_size > 0 ? lmm : NULL, handle);
1686 rc = mdd_trans_start(env, mdd, handle);
1690 /* permission changes may require sync operation */
1691 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1692 handle->th_sync |= !!mdd->mdd_sync_permission;
1694 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1695 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1696 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1698 #ifdef HAVE_QUOTA_SUPPORT
1699 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1700 struct obd_export *exp = md_quota(env)->mq_exp;
1701 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1703 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1705 quota_opc = FSFILT_OP_SETATTR;
1706 mdd_quota_wrapper(la_copy, qnids);
1707 mdd_quota_wrapper(la_tmp, qoids);
1708 /* get file quota for new owner */
1709 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1710 qnids, inode_pending, 1, NULL, 0,
1712 block_count = (la_tmp->la_blocks + 7) >> 3;
1715 mdd_data_get(env, mdd_obj, &data);
1716 /* get block quota for new owner */
1717 lquota_chkquota(mds_quota_interface_ref, obd,
1718 exp, qnids, block_pending,
1720 LQUOTA_FLAGS_BLK, data, 1);
1726 if (la_copy->la_valid & LA_FLAGS) {
1727 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1730 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1731 } else if (la_copy->la_valid) { /* setattr */
1732 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1734 /* journal chown/chgrp in llog, just like unlink */
1735 if (rc == 0 && lmm_size){
1736 cookie_size = mdd_lov_cookiesize(env, mdd);
1737 logcookies = mdd_max_cookie_get(env, mdd);
1738 if (logcookies == NULL)
1739 GOTO(cleanup, rc = -ENOMEM);
1741 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1742 logcookies, cookie_size) <= 0)
1747 if (rc == 0 && ma->ma_valid & MA_LOV) {
1750 mode = mdd_object_type(mdd_obj);
1751 if (S_ISREG(mode) || S_ISDIR(mode)) {
1752 rc = mdd_lsm_sanity_check(env, mdd_obj);
1756 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1757 ma->ma_lmm_size, handle, 1);
1761 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1764 mode = mdd_object_type(mdd_obj);
1766 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1771 rc = mdd_attr_set_changelog(env, obj, handle,
1772 ma->ma_attr.la_valid);
1774 mdd_trans_stop(env, mdd, rc, handle);
1775 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1776 /*set obd attr, if needed*/
1777 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1780 #ifdef HAVE_QUOTA_SUPPORT
1782 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1784 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1786 /* Trigger dqrel/dqacq for original owner and new owner.
1787 * If failed, the next call for lquota_chkquota will
1789 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1796 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1797 const struct lu_buf *buf, const char *name, int fl,
1798 struct thandle *handle)
1803 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1804 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1805 mdd_write_unlock(env, obj);
1810 static int mdd_xattr_sanity_check(const struct lu_env *env,
1811 struct mdd_object *obj)
1813 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1814 struct md_ucred *uc = md_ucred(env);
1818 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1821 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1825 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1826 !mdd_capable(uc, CFS_CAP_FOWNER))
1832 static int mdd_declare_xattr_set(const struct lu_env *env,
1833 struct mdd_device *mdd,
1834 struct mdd_object *obj,
1835 const struct lu_buf *buf,
1837 struct thandle *handle)
1842 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1846 /* Only record user xattr changes */
1847 if ((strncmp("user.", name, 5) == 0))
1848 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1854 * The caller should guarantee to update the object ctime
1855 * after xattr_set if needed.
1857 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1858 const struct lu_buf *buf, const char *name,
1861 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1862 struct mdd_device *mdd = mdo2mdd(obj);
1863 struct thandle *handle;
1867 rc = mdd_xattr_sanity_check(env, mdd_obj);
1871 handle = mdd_trans_create(env, mdd);
1873 RETURN(PTR_ERR(handle));
1875 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1879 rc = mdd_trans_start(env, mdd, handle);
1883 /* security-replated changes may require sync */
1884 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1885 handle->th_sync |= !!mdd->mdd_sync_permission;
1887 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1889 /* Only record system & user xattr changes */
1890 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1891 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1892 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1893 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1894 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1895 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1896 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1900 mdd_trans_stop(env, mdd, rc, handle);
1905 static int mdd_declare_xattr_del(const struct lu_env *env,
1906 struct mdd_device *mdd,
1907 struct mdd_object *obj,
1909 struct thandle *handle)
1913 rc = mdo_declare_xattr_del(env, obj, name, handle);
1917 /* Only record user xattr changes */
1918 if ((strncmp("user.", name, 5) == 0))
1919 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1925 * The caller should guarantee to update the object ctime
1926 * after xattr_set if needed.
1928 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1931 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1932 struct mdd_device *mdd = mdo2mdd(obj);
1933 struct thandle *handle;
1937 rc = mdd_xattr_sanity_check(env, mdd_obj);
1941 handle = mdd_trans_create(env, mdd);
1943 RETURN(PTR_ERR(handle));
1945 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1949 rc = mdd_trans_start(env, mdd, handle);
1953 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1954 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1955 mdd_object_capa(env, mdd_obj));
1956 mdd_write_unlock(env, mdd_obj);
1958 /* Only record system & user xattr changes */
1959 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1960 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1961 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1962 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1963 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1964 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1965 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1969 mdd_trans_stop(env, mdd, rc, handle);
1974 /* partial unlink */
1975 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1978 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1979 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1980 struct mdd_device *mdd = mdo2mdd(obj);
1981 struct thandle *handle;
1982 #ifdef HAVE_QUOTA_SUPPORT
1983 struct obd_device *obd = mdd->mdd_obd_dev;
1984 struct mds_obd *mds = &obd->u.mds;
1985 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1991 /* XXX: this code won't be used ever:
1992 * DNE uses slightly different approach */
1996 * Check -ENOENT early here because we need to get object type
1997 * to calculate credits before transaction start
1999 if (mdd_object_exists(mdd_obj) == 0) {
2000 CERROR("%s: object "DFID" not found: rc = -2\n",
2001 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2005 LASSERT(mdd_object_exists(mdd_obj) > 0);
2007 handle = mdd_trans_create(env, mdd);
2011 rc = mdd_trans_start(env, mdd, handle);
2013 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2015 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2019 mdo_ref_del(env, mdd_obj, handle);
2021 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2023 mdo_ref_del(env, mdd_obj, handle);
2026 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2027 la_copy->la_ctime = ma->ma_attr.la_ctime;
2029 la_copy->la_valid = LA_CTIME;
2030 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2034 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2035 #ifdef HAVE_QUOTA_SUPPORT
2036 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2037 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2038 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2039 mdd_quota_wrapper(&ma->ma_attr, qids);
2046 mdd_write_unlock(env, mdd_obj);
2047 mdd_trans_stop(env, mdd, rc, handle);
2048 #ifdef HAVE_QUOTA_SUPPORT
2050 /* Trigger dqrel on the owner of child. If failed,
2051 * the next call for lquota_chkquota will process it */
2052 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2058 /* partial operation */
2059 static int mdd_oc_sanity_check(const struct lu_env *env,
2060 struct mdd_object *obj,
2066 switch (ma->ma_attr.la_mode & S_IFMT) {
2083 static int mdd_object_create(const struct lu_env *env,
2084 struct md_object *obj,
2085 const struct md_op_spec *spec,
2089 struct mdd_device *mdd = mdo2mdd(obj);
2090 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2091 const struct lu_fid *pfid = spec->u.sp_pfid;
2092 struct thandle *handle;
2093 #ifdef HAVE_QUOTA_SUPPORT
2094 struct obd_device *obd = mdd->mdd_obd_dev;
2095 struct obd_export *exp = md_quota(env)->mq_exp;
2096 struct mds_obd *mds = &obd->u.mds;
2097 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2098 int quota_opc = 0, block_count = 0;
2099 int inode_pending[MAXQUOTAS] = { 0, 0 };
2100 int block_pending[MAXQUOTAS] = { 0, 0 };
2105 /* XXX: this code won't be used ever:
2106 * DNE uses slightly different approach */
2109 #ifdef HAVE_QUOTA_SUPPORT
2110 if (mds->mds_quota) {
2111 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2112 mdd_quota_wrapper(&ma->ma_attr, qids);
2113 /* get file quota for child */
2114 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2115 qids, inode_pending, 1, NULL, 0,
2117 switch (ma->ma_attr.la_mode & S_IFMT) {
2126 /* get block quota for child */
2128 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2129 qids, block_pending, block_count,
2130 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2134 handle = mdd_trans_create(env, mdd);
2136 GOTO(out_pending, rc = PTR_ERR(handle));
2138 rc = mdd_trans_start(env, mdd, handle);
2140 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2141 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2145 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2149 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2150 /* If creating the slave object, set slave EA here. */
2151 int lmv_size = spec->u.sp_ea.eadatalen;
2152 struct lmv_stripe_md *lmv;
2154 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2155 LASSERT(lmv != NULL && lmv_size > 0);
2157 rc = __mdd_xattr_set(env, mdd_obj,
2158 mdd_buf_get_const(env, lmv, lmv_size),
2159 XATTR_NAME_LMV, 0, handle);
2163 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2166 #ifdef CONFIG_FS_POSIX_ACL
2167 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2168 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2170 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2171 buf->lb_len = spec->u.sp_ea.eadatalen;
2172 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2173 rc = __mdd_acl_init(env, mdd_obj, buf,
2174 &ma->ma_attr.la_mode,
2179 ma->ma_attr.la_valid |= LA_MODE;
2182 pfid = spec->u.sp_ea.fid;
2185 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2191 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2192 mdd_write_unlock(env, mdd_obj);
2194 mdd_trans_stop(env, mdd, rc, handle);
2196 #ifdef HAVE_QUOTA_SUPPORT
2198 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2200 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2202 /* Trigger dqacq on the owner of child. If failed,
2203 * the next call for lquota_chkquota will process it. */
2204 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2212 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2213 const struct md_attr *ma)
2215 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2216 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2217 struct mdd_device *mdd = mdo2mdd(obj);
2218 struct thandle *handle;
2222 /* XXX: this code won't be used ever:
2223 * DNE uses slightly different approach */
2226 handle = mdd_trans_create(env, mdd);
2230 rc = mdd_trans_start(env, mdd, handle);
2232 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2233 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2235 mdo_ref_add(env, mdd_obj, handle);
2236 mdd_write_unlock(env, mdd_obj);
2238 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2239 la_copy->la_ctime = ma->ma_attr.la_ctime;
2241 la_copy->la_valid = LA_CTIME;
2242 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2245 mdd_trans_stop(env, mdd, 0, handle);
2250 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
2251 struct mdd_object *child, struct lu_attr *attr)
2253 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
2254 struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
2255 struct dt_object *nc = mdd_object_child(child);
2257 /* @hint will be initialized by underlying device. */
2258 nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
2262 * do NOT or the MAY_*'s, you'll get the weakest
2264 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2268 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2269 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2270 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2271 * owner can write to a file even if it is marked readonly to hide
2272 * its brokenness. (bug 5781) */
2273 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2274 struct md_ucred *uc = md_ucred(env);
2276 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2277 (la->la_uid == uc->mu_fsuid))
2281 if (flags & FMODE_READ)
2283 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2285 if (flags & MDS_FMODE_EXEC)
2290 static int mdd_open_sanity_check(const struct lu_env *env,
2291 struct mdd_object *obj, int flag)
2293 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2298 if (mdd_is_dead_obj(obj))
2301 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2305 if (S_ISLNK(tmp_la->la_mode))
2308 mode = accmode(env, tmp_la, flag);
2310 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2313 if (!(flag & MDS_OPEN_CREATED)) {
2314 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2319 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2320 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2321 flag &= ~MDS_OPEN_TRUNC;
2323 /* For writing append-only file must open it with append mode. */
2324 if (mdd_is_append(obj)) {
2325 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2327 if (flag & MDS_OPEN_TRUNC)
2333 * Now, flag -- O_NOATIME does not be packed by client.
2335 if (flag & O_NOATIME) {
2336 struct md_ucred *uc = md_ucred(env);
2338 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2339 (uc->mu_valid == UCRED_NEW)) &&
2340 (uc->mu_fsuid != tmp_la->la_uid) &&
2341 !mdd_capable(uc, CFS_CAP_FOWNER))
2349 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2352 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2355 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2357 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2359 mdd_obj->mod_count++;
2361 mdd_write_unlock(env, mdd_obj);
2365 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2366 struct md_attr *ma, struct thandle *handle)
2370 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2374 return mdo_declare_destroy(env, obj, handle);
2377 /* return md_attr back,
2378 * if it is last unlink then return lov ea + llog cookie*/
2379 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2380 struct md_attr *ma, struct thandle *handle)
2385 if (S_ISREG(mdd_object_type(obj))) {
2386 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2387 * Caller must be ready for that. */
2388 rc = __mdd_lmm_get(env, obj, ma);
2389 if ((ma->ma_valid & MA_LOV))
2390 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2395 rc = mdo_destroy(env, obj, handle);
2400 static int mdd_declare_close(const struct lu_env *env,
2401 struct mdd_object *obj,
2403 struct thandle *handle)
2407 rc = orph_declare_index_delete(env, obj, handle);
2411 return mdd_declare_object_kill(env, obj, ma, handle);
2415 * No permission check is needed.
2417 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2418 struct md_attr *ma, int mode)
2420 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2421 struct mdd_device *mdd = mdo2mdd(obj);
2422 struct thandle *handle = NULL;
2424 int is_orphan = 0, reset = 1;
2426 #ifdef HAVE_QUOTA_SUPPORT
2427 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2428 struct mds_obd *mds = &obd->u.mds;
2429 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2434 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2435 mdd_obj->mod_count--;
2437 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2438 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2439 "list\n", PFID(mdd_object_fid(mdd_obj)));
2443 /* check without any lock */
2444 if (mdd_obj->mod_count == 1 &&
2445 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2447 handle = mdd_trans_create(env, mdo2mdd(obj));
2449 RETURN(PTR_ERR(handle));
2451 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2455 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2459 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2464 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2465 if (handle == NULL && mdd_obj->mod_count == 1 &&
2466 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2467 mdd_write_unlock(env, mdd_obj);
2471 /* release open count */
2472 mdd_obj->mod_count --;
2474 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2475 /* remove link to object from orphan index */
2476 LASSERT(handle != NULL);
2477 rc = __mdd_orphan_del(env, mdd_obj, handle);
2479 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2480 "list, OSS objects to be destroyed.\n",
2481 PFID(mdd_object_fid(mdd_obj)));
2484 CERROR("Object "DFID" can not be deleted from orphan "
2485 "list, maybe cause OST objects can not be "
2486 "destroyed (err: %d).\n",
2487 PFID(mdd_object_fid(mdd_obj)), rc);
2488 /* If object was not deleted from orphan list, do not
2489 * destroy OSS objects, which will be done when next
2495 rc = mdd_iattr_get(env, mdd_obj, ma);
2496 /* Object maybe not in orphan list originally, it is rare case for
2497 * mdd_finish_unlink() failure. */
2498 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2499 #ifdef HAVE_QUOTA_SUPPORT
2500 if (mds->mds_quota) {
2501 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2502 mdd_quota_wrapper(&ma->ma_attr, qids);
2505 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2506 if (ma->ma_valid & MA_FLAGS &&
2507 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2508 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2510 if (handle == NULL) {
2511 handle = mdd_trans_create(env, mdo2mdd(obj));
2513 GOTO(out, rc = PTR_ERR(handle));
2515 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2520 rc = mdd_declare_changelog_store(env, mdd,
2525 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2530 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2536 CERROR("Error when prepare to delete Object "DFID" , "
2537 "which will cause OST objects can not be "
2538 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2544 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2546 mdd_write_unlock(env, mdd_obj);
2549 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2550 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2551 if (handle == NULL) {
2552 handle = mdd_trans_create(env, mdo2mdd(obj));
2554 GOTO(stop, rc = IS_ERR(handle));
2556 rc = mdd_declare_changelog_store(env, mdd, NULL,
2561 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2566 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2572 mdd_trans_stop(env, mdd, rc, handle);
2573 #ifdef HAVE_QUOTA_SUPPORT
2575 /* Trigger dqrel on the owner of child. If failed,
2576 * the next call for lquota_chkquota will process it */
2577 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2584 * Permission check is done when open,
2585 * no need check again.
2587 static int mdd_readpage_sanity_check(const struct lu_env *env,
2588 struct mdd_object *obj)
2590 struct dt_object *next = mdd_object_child(obj);
2594 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2602 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2603 int nob, const struct dt_it_ops *iops,
2604 struct dt_it *it, __u32 attr, void *arg)
2606 struct lu_dirpage *dp = &lp->lp_dir;
2610 struct lu_dirent *ent;
2611 struct lu_dirent *last = NULL;
2614 memset(area, 0, sizeof (*dp));
2615 area += sizeof (*dp);
2616 nob -= sizeof (*dp);
2623 len = iops->key_size(env, it);
2625 /* IAM iterator can return record with zero len. */
2629 hash = iops->store(env, it);
2630 if (unlikely(first)) {
2632 dp->ldp_hash_start = cpu_to_le64(hash);
2635 /* calculate max space required for lu_dirent */
2636 recsize = lu_dirent_calc_size(len, attr);
2638 if (nob >= recsize) {
2639 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2640 if (result == -ESTALE)
2645 /* osd might not able to pack all attributes,
2646 * so recheck rec length */
2647 recsize = le16_to_cpu(ent->lde_reclen);
2649 result = (last != NULL) ? 0 :-EINVAL;
2653 ent = (void *)ent + recsize;
2657 result = iops->next(env, it);
2658 if (result == -ESTALE)
2660 } while (result == 0);
2663 dp->ldp_hash_end = cpu_to_le64(hash);
2665 if (last->lde_hash == dp->ldp_hash_end)
2666 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2667 last->lde_reclen = 0; /* end mark */
2670 /* end of directory */
2671 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2673 CWARN("build page failed: %d!\n", result);
2677 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2678 const struct lu_rdpg *rdpg)
2680 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2684 if (mdd_object_exists(mdd_obj) == 0) {
2685 CERROR("%s: object "DFID" not found: rc = -2\n",
2686 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2690 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2691 rc = mdd_readpage_sanity_check(env, mdd_obj);
2693 GOTO(out_unlock, rc);
2695 if (mdd_is_dead_obj(mdd_obj)) {
2697 struct lu_dirpage *dp;
2700 * According to POSIX, please do not return any entry to client:
2701 * even dot and dotdot should not be returned.
2703 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2704 PFID(mdd_object_fid(mdd_obj)));
2706 if (rdpg->rp_count <= 0)
2707 GOTO(out_unlock, rc = -EFAULT);
2708 LASSERT(rdpg->rp_pages != NULL);
2710 pg = rdpg->rp_pages[0];
2711 dp = (struct lu_dirpage*)cfs_kmap(pg);
2712 memset(dp, 0 , sizeof(struct lu_dirpage));
2713 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2714 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2715 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2717 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2720 rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2721 mdd_dir_page_build, NULL);
2723 struct lu_dirpage *dp;
2725 dp = cfs_kmap(rdpg->rp_pages[0]);
2726 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2729 * No pages were processed, mark this for first page
2732 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2733 rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2735 cfs_kunmap(rdpg->rp_pages[0]);
2738 GOTO(out_unlock, rc);
2740 mdd_read_unlock(env, mdd_obj);
2744 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2746 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2748 if (mdd_object_exists(mdd_obj) == 0) {
2749 CERROR("%s: object "DFID" not found: rc = -2\n",
2750 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2753 return dt_object_sync(env, mdd_object_child(mdd_obj));
2756 const struct md_object_operations mdd_obj_ops = {
2757 .moo_permission = mdd_permission,
2758 .moo_attr_get = mdd_attr_get,
2759 .moo_attr_set = mdd_attr_set,
2760 .moo_xattr_get = mdd_xattr_get,
2761 .moo_xattr_set = mdd_xattr_set,
2762 .moo_xattr_list = mdd_xattr_list,
2763 .moo_xattr_del = mdd_xattr_del,
2764 .moo_object_create = mdd_object_create,
2765 .moo_ref_add = mdd_ref_add,
2766 .moo_ref_del = mdd_ref_del,
2767 .moo_open = mdd_open,
2768 .moo_close = mdd_close,
2769 .moo_readpage = mdd_readpage,
2770 .moo_readlink = mdd_readlink,
2771 .moo_changelog = mdd_changelog,
2772 .moo_capa_get = mdd_capa_get,
2773 .moo_object_sync = mdd_object_sync,
2774 .moo_path = mdd_path,
2775 .moo_file_lock = mdd_file_lock,
2776 .moo_file_unlock = mdd_file_unlock,