4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
58 #include "mdd_internal.h"
60 static const struct lu_object_operations mdd_lu_obj_ops;
62 static int mdd_xattr_get(const struct lu_env *env,
63 struct md_object *obj, struct lu_buf *buf,
66 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
69 if (mdd_object_exists(obj) == 0) {
70 CERROR("%s: object "DFID" not found: rc = -2\n",
71 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
74 mdo_data_get(env, obj, data);
78 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
79 struct lu_attr *la, struct lustre_capa *capa)
81 if (mdd_object_exists(obj) == 0) {
82 CERROR("%s: object "DFID" not found: rc = -2\n",
83 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
86 return mdo_attr_get(env, obj, la, capa);
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93 if (flags & LUSTRE_APPEND_FL)
94 obj->mod_flags |= APPEND_OBJ;
96 if (flags & LUSTRE_IMMUTABLE_FL)
97 obj->mod_flags |= IMMUTE_OBJ;
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 struct mdd_thread_info *info;
104 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105 LASSERT(info != NULL);
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
113 buf = &mdd_env_info(env)->mti_buf;
119 void mdd_buf_put(struct lu_buf *buf)
121 if (buf == NULL || buf->lb_buf == NULL)
123 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129 const void *area, ssize_t len)
133 buf = &mdd_env_info(env)->mti_buf;
134 buf->lb_buf = (void *)area;
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
147 if (buf->lb_buf == NULL) {
149 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150 if (buf->lb_buf == NULL)
156 /** Increase the size of the \a mti_big_buf.
157 * preserves old data in buffer
158 * old buffer remains unchanged on error
159 * \retval 0 or -ENOMEM
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
166 LASSERT(len >= oldbuf->lb_len);
167 OBD_ALLOC_LARGE(buf.lb_buf, len);
169 if (buf.lb_buf == NULL)
173 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177 memcpy(oldbuf, &buf, sizeof(buf));
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183 struct mdd_device *mdd)
185 struct mdd_thread_info *mti = mdd_env_info(env);
188 max_cookie_size = mdd_lov_cookiesize(env, mdd);
189 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190 if (mti->mti_max_cookie)
191 OBD_FREE_LARGE(mti->mti_max_cookie,
192 mti->mti_max_cookie_size);
193 mti->mti_max_cookie = NULL;
194 mti->mti_max_cookie_size = 0;
196 if (unlikely(mti->mti_max_cookie == NULL)) {
197 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198 if (likely(mti->mti_max_cookie != NULL))
199 mti->mti_max_cookie_size = max_cookie_size;
201 if (likely(mti->mti_max_cookie != NULL))
202 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203 return mti->mti_max_cookie;
206 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
208 struct mdd_thread_info *mti = mdd_env_info(env);
210 if (unlikely(mti->mti_max_lmm_size < size)) {
211 int rsize = size_roundup_power2(size);
213 if (mti->mti_max_lmm_size > 0) {
214 LASSERT(mti->mti_max_lmm);
215 OBD_FREE_LARGE(mti->mti_max_lmm,
216 mti->mti_max_lmm_size);
217 mti->mti_max_lmm = NULL;
218 mti->mti_max_lmm_size = 0;
221 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
222 if (likely(mti->mti_max_lmm != NULL))
223 mti->mti_max_lmm_size = rsize;
225 return mti->mti_max_lmm;
228 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
229 struct mdd_device *mdd)
233 max_lmm_size = mdd_lov_mdsize(env, mdd);
234 return mdd_max_lmm_buffer(env, max_lmm_size);
237 struct lu_object *mdd_object_alloc(const struct lu_env *env,
238 const struct lu_object_header *hdr,
241 struct mdd_object *mdd_obj;
243 OBD_ALLOC_PTR(mdd_obj);
244 if (mdd_obj != NULL) {
247 o = mdd2lu_obj(mdd_obj);
248 lu_object_init(o, NULL, d);
249 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
250 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
251 mdd_obj->mod_count = 0;
252 o->lo_ops = &mdd_lu_obj_ops;
259 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
260 const struct lu_object_conf *unused)
262 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
263 struct mdd_object *mdd_obj = lu2mdd_obj(o);
264 struct lu_object *below;
265 struct lu_device *under;
268 mdd_obj->mod_cltime = 0;
269 under = &d->mdd_child->dd_lu_dev;
270 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
271 mdd_pdlock_init(mdd_obj);
275 lu_object_add(o, below);
280 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
282 if (lu_object_exists(o))
283 return mdd_get_flags(env, lu2mdd_obj(o));
288 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
290 struct mdd_object *mdd = lu2mdd_obj(o);
296 static int mdd_object_print(const struct lu_env *env, void *cookie,
297 lu_printer_t p, const struct lu_object *o)
299 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
300 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
301 "valid=%x, cltime="LPU64", flags=%lx)",
302 mdd, mdd->mod_count, mdd->mod_valid,
303 mdd->mod_cltime, mdd->mod_flags);
306 static const struct lu_object_operations mdd_lu_obj_ops = {
307 .loo_object_init = mdd_object_init,
308 .loo_object_start = mdd_object_start,
309 .loo_object_free = mdd_object_free,
310 .loo_object_print = mdd_object_print,
313 struct mdd_object *mdd_object_find(const struct lu_env *env,
314 struct mdd_device *d,
315 const struct lu_fid *f)
317 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
320 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
321 const char *path, struct lu_fid *fid)
324 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
325 struct mdd_object *obj;
326 struct lu_name *lname = &mdd_env_info(env)->mti_name;
331 /* temp buffer for path element */
332 buf = mdd_buf_alloc(env, PATH_MAX);
333 if (buf->lb_buf == NULL)
336 lname->ln_name = name = buf->lb_buf;
337 lname->ln_namelen = 0;
338 *f = mdd->mdd_root_fid;
345 while (*path != '/' && *path != '\0') {
353 /* find obj corresponding to fid */
354 obj = mdd_object_find(env, mdd, f);
356 GOTO(out, rc = -EREMOTE);
358 GOTO(out, rc = PTR_ERR(obj));
359 /* get child fid from parent and name */
360 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
361 mdd_object_put(env, obj);
366 lname->ln_namelen = 0;
375 /** The maximum depth that fid2path() will search.
376 * This is limited only because we want to store the fids for
377 * historical path lookup purposes.
379 #define MAX_PATH_DEPTH 100
381 /** mdd_path() lookup structure. */
382 struct path_lookup_info {
383 __u64 pli_recno; /**< history point */
384 __u64 pli_currec; /**< current record */
385 struct lu_fid pli_fid;
386 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
387 struct mdd_object *pli_mdd_obj;
388 char *pli_path; /**< full path */
390 int pli_linkno; /**< which hardlink to follow */
391 int pli_fidcount; /**< number of \a pli_fids */
394 static int mdd_path_current(const struct lu_env *env,
395 struct path_lookup_info *pli)
397 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
398 struct mdd_object *mdd_obj;
399 struct lu_buf *buf = NULL;
400 struct link_ea_header *leh;
401 struct link_ea_entry *lee;
402 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
403 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
409 ptr = pli->pli_path + pli->pli_pathlen - 1;
412 pli->pli_fidcount = 0;
413 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
415 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
416 mdd_obj = mdd_object_find(env, mdd,
417 &pli->pli_fids[pli->pli_fidcount]);
419 GOTO(out, rc = -EREMOTE);
421 GOTO(out, rc = PTR_ERR(mdd_obj));
422 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
424 mdd_object_put(env, mdd_obj);
428 /* Do I need to error out here? */
433 /* Get parent fid and object name */
434 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
435 buf = mdd_links_get(env, mdd_obj);
436 mdd_read_unlock(env, mdd_obj);
437 mdd_object_put(env, mdd_obj);
439 GOTO(out, rc = PTR_ERR(buf));
442 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
443 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445 /* If set, use link #linkno for path lookup, otherwise use
446 link #0. Only do this for the final path element. */
447 if ((pli->pli_fidcount == 0) &&
448 (pli->pli_linkno < leh->leh_reccount)) {
450 for (count = 0; count < pli->pli_linkno; count++) {
451 lee = (struct link_ea_entry *)
452 ((char *)lee + reclen);
453 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
455 if (pli->pli_linkno < leh->leh_reccount - 1)
456 /* indicate to user there are more links */
460 /* Pack the name in the end of the buffer */
461 ptr -= tmpname->ln_namelen;
462 if (ptr - 1 <= pli->pli_path)
463 GOTO(out, rc = -EOVERFLOW);
464 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
467 /* Store the parent fid for historic lookup */
468 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
469 GOTO(out, rc = -EOVERFLOW);
470 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
473 /* Verify that our path hasn't changed since we started the lookup.
474 Record the current index, and verify the path resolves to the
475 same fid. If it does, then the path is correct as of this index. */
476 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
477 pli->pli_currec = mdd->mdd_cl.mc_index;
478 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
479 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
481 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
482 GOTO (out, rc = -EAGAIN);
484 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
485 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
486 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
487 PFID(&pli->pli_fid));
488 GOTO(out, rc = -EAGAIN);
490 ptr++; /* skip leading / */
491 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
495 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
496 /* if we vmalloced a large buffer drop it */
502 static int mdd_path_historic(const struct lu_env *env,
503 struct path_lookup_info *pli)
508 /* Returns the full path to this fid, as of changelog record recno. */
509 static int mdd_path(const struct lu_env *env, struct md_object *obj,
510 char *path, int pathlen, __u64 *recno, int *linkno)
512 struct path_lookup_info *pli;
520 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
529 pli->pli_mdd_obj = md2mdd_obj(obj);
530 pli->pli_recno = *recno;
531 pli->pli_path = path;
532 pli->pli_pathlen = pathlen;
533 pli->pli_linkno = *linkno;
535 /* Retry multiple times in case file is being moved */
536 while (tries-- && rc == -EAGAIN)
537 rc = mdd_path_current(env, pli);
539 /* For historical path lookup, the current links may not have existed
540 * at "recno" time. We must switch over to earlier links/parents
541 * by using the changelog records. If the earlier parent doesn't
542 * exist, we must search back through the changelog to reconstruct
543 * its parents, then check if it exists, etc.
544 * We may ignore this problem for the initial implementation and
545 * state that an "original" hardlink must still exist for us to find
546 * historic path name. */
547 if (pli->pli_recno != -1) {
548 rc = mdd_path_historic(env, pli);
550 *recno = pli->pli_currec;
551 /* Return next link index to caller */
552 *linkno = pli->pli_linkno;
560 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
562 struct lu_attr *la = &mdd_env_info(env)->mti_la;
566 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
568 mdd_flags_xlate(obj, la->la_flags);
573 /* get only inode attributes */
574 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
580 if (ma->ma_valid & MA_INODE)
583 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
584 mdd_object_capa(env, mdd_obj));
586 ma->ma_valid |= MA_INODE;
590 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 struct lov_desc *ldesc;
593 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
594 struct lov_user_md *lum = (struct lov_user_md*)lmm;
600 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
601 LASSERT(ldesc != NULL);
603 lum->lmm_magic = LOV_MAGIC_V1;
604 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
605 lum->lmm_pattern = ldesc->ld_pattern;
606 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
607 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
608 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610 RETURN(sizeof(*lum));
613 static int is_rootdir(struct mdd_object *mdd_obj)
615 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
616 const struct lu_fid *fid = mdo2fid(mdd_obj);
618 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
621 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
624 struct mdd_thread_info *info = mdd_env_info(env);
629 LASSERT(info != NULL);
630 LASSERT(ma->ma_big_lmm_used == 0);
632 if (ma->ma_lmm_size == 0) {
633 CERROR("No buffer to hold %s xattr of object "DFID"\n",
634 XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
638 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
639 mdd_object_capa(env, obj));
643 /* big_lmm may need to grow */
645 mdd_max_lmm_buffer(env, size);
646 if (info->mti_max_lmm == NULL)
649 LASSERT(info->mti_max_lmm_size >= size);
650 rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
655 ma->ma_big_lmm_used = 1;
656 ma->ma_valid |= MA_LOV;
657 ma->ma_lmm = info->mti_max_lmm;
658 ma->ma_lmm_size = size;
663 /* get lov EA only */
664 static int __mdd_lmm_get(const struct lu_env *env,
665 struct mdd_object *mdd_obj, struct md_attr *ma)
670 if (ma->ma_valid & MA_LOV)
673 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
676 rc = mdd_big_lmm_get(env, mdd_obj, ma);
677 else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
678 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
681 ma->ma_lmm_size = rc;
682 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
683 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
689 /* get the first parent fid from link EA */
690 static int mdd_pfid_get(const struct lu_env *env,
691 struct mdd_object *mdd_obj, struct md_attr *ma)
694 struct link_ea_header *leh;
695 struct link_ea_entry *lee;
696 struct lu_fid *pfid = &ma->ma_pfid;
699 if (ma->ma_valid & MA_PFID)
702 buf = mdd_links_get(env, mdd_obj);
704 RETURN(PTR_ERR(buf));
707 lee = (struct link_ea_entry *)(leh + 1);
708 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
709 fid_be_to_cpu(pfid, pfid);
710 ma->ma_valid |= MA_PFID;
711 if (buf->lb_len > OBD_ALLOC_BIG)
712 /* if we vmalloced a large buffer drop it */
717 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
723 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
724 rc = __mdd_lmm_get(env, mdd_obj, ma);
725 mdd_read_unlock(env, mdd_obj);
730 static int __mdd_lmv_get(const struct lu_env *env,
731 struct mdd_object *mdd_obj, struct md_attr *ma)
736 if (ma->ma_valid & MA_LMV)
739 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
742 ma->ma_valid |= MA_LMV;
748 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
751 struct mdd_thread_info *info = mdd_env_info(env);
752 struct lustre_mdt_attrs *lma =
753 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
758 /* If all needed data are already valid, nothing to do */
759 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
760 (ma->ma_need & (MA_HSM | MA_SOM)))
763 /* Read LMA from disk EA */
764 lma_size = sizeof(info->mti_xattr_buf);
765 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
769 /* Useless to check LMA incompatibility because this is already done in
770 * osd_ea_fid_get(), and this will fail long before this code is
772 * So, if we are here, LMA is compatible.
775 lustre_lma_swab(lma);
777 /* Swab and copy LMA */
778 if (ma->ma_need & MA_HSM) {
779 if (lma->lma_compat & LMAC_HSM)
780 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
782 ma->ma_hsm.mh_flags = 0;
783 ma->ma_valid |= MA_HSM;
787 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
788 LASSERT(ma->ma_som != NULL);
789 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
790 ma->ma_som->msd_size = lma->lma_som_size;
791 ma->ma_som->msd_blocks = lma->lma_som_blocks;
792 ma->ma_som->msd_mountid = lma->lma_som_mountid;
793 ma->ma_valid |= MA_SOM;
799 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
805 if (ma->ma_need & MA_INODE)
806 rc = mdd_iattr_get(env, mdd_obj, ma);
808 if (rc == 0 && ma->ma_need & MA_LOV) {
809 if (S_ISREG(mdd_object_type(mdd_obj)) ||
810 S_ISDIR(mdd_object_type(mdd_obj)))
811 rc = __mdd_lmm_get(env, mdd_obj, ma);
813 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
814 if (S_ISREG(mdd_object_type(mdd_obj)))
815 rc = mdd_pfid_get(env, mdd_obj, ma);
817 if (rc == 0 && ma->ma_need & MA_LMV) {
818 if (S_ISDIR(mdd_object_type(mdd_obj)))
819 rc = __mdd_lmv_get(env, mdd_obj, ma);
821 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
822 if (S_ISREG(mdd_object_type(mdd_obj)))
823 rc = __mdd_lma_get(env, mdd_obj, ma);
825 #ifdef CONFIG_FS_POSIX_ACL
826 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
827 if (S_ISDIR(mdd_object_type(mdd_obj)))
828 rc = mdd_def_acl_get(env, mdd_obj, ma);
831 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
832 rc, ma->ma_valid, ma->ma_lmm);
836 int mdd_attr_get_internal_locked(const struct lu_env *env,
837 struct mdd_object *mdd_obj, struct md_attr *ma)
840 int needlock = ma->ma_need &
841 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
844 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
845 rc = mdd_attr_get_internal(env, mdd_obj, ma);
847 mdd_read_unlock(env, mdd_obj);
852 * No permission check is needed.
854 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
857 struct mdd_object *mdd_obj = md2mdd_obj(obj);
861 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
866 * No permission check is needed.
868 static int mdd_xattr_get(const struct lu_env *env,
869 struct md_object *obj, struct lu_buf *buf,
872 struct mdd_object *mdd_obj = md2mdd_obj(obj);
877 if (mdd_object_exists(mdd_obj) == 0) {
878 CERROR("%s: object "DFID" not found: rc = -2\n",
879 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
883 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
884 rc = mdo_xattr_get(env, mdd_obj, buf, name,
885 mdd_object_capa(env, mdd_obj));
886 mdd_read_unlock(env, mdd_obj);
892 * Permission check is done when open,
893 * no need check again.
895 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
898 struct mdd_object *mdd_obj = md2mdd_obj(obj);
899 struct dt_object *next;
904 if (mdd_object_exists(mdd_obj) == 0) {
905 CERROR("%s: object "DFID" not found: rc = -2\n",
906 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
910 next = mdd_object_child(mdd_obj);
911 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
912 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
913 mdd_object_capa(env, mdd_obj));
914 mdd_read_unlock(env, mdd_obj);
919 * No permission check is needed.
921 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
924 struct mdd_object *mdd_obj = md2mdd_obj(obj);
929 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
930 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
931 mdd_read_unlock(env, mdd_obj);
936 int mdd_declare_object_create_internal(const struct lu_env *env,
937 struct mdd_object *p,
938 struct mdd_object *c,
940 struct thandle *handle,
941 const struct md_op_spec *spec)
943 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
944 const struct dt_index_features *feat = spec->sp_feat;
948 if (feat != &dt_directory_features && feat != NULL)
949 dof->dof_type = DFT_INDEX;
951 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
953 dof->u.dof_idx.di_feat = feat;
955 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
960 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
961 struct mdd_object *c, struct md_attr *ma,
962 struct thandle *handle,
963 const struct md_op_spec *spec)
965 struct lu_attr *attr = &ma->ma_attr;
966 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
967 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
968 const struct dt_index_features *feat = spec->sp_feat;
972 if (!mdd_object_exists(c)) {
973 struct dt_object *next = mdd_object_child(c);
976 if (feat != &dt_directory_features && feat != NULL)
977 dof->dof_type = DFT_INDEX;
979 dof->dof_type = dt_mode_to_dft(attr->la_mode);
981 dof->u.dof_idx.di_feat = feat;
983 /* @hint will be initialized by underlying device. */
984 next->do_ops->do_ah_init(env, hint,
985 p ? mdd_object_child(p) : NULL,
986 attr->la_mode & S_IFMT);
988 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
989 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
997 * Make sure the ctime is increased only.
999 static inline int mdd_attr_check(const struct lu_env *env,
1000 struct mdd_object *obj,
1001 struct lu_attr *attr)
1003 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1007 if (attr->la_valid & LA_CTIME) {
1008 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1012 if (attr->la_ctime < tmp_la->la_ctime)
1013 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1014 else if (attr->la_valid == LA_CTIME &&
1015 attr->la_ctime == tmp_la->la_ctime)
1016 attr->la_valid &= ~LA_CTIME;
1021 int mdd_attr_set_internal(const struct lu_env *env,
1022 struct mdd_object *obj,
1023 struct lu_attr *attr,
1024 struct thandle *handle,
1030 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1031 #ifdef CONFIG_FS_POSIX_ACL
1032 if (!rc && (attr->la_valid & LA_MODE) && needacl)
1033 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1038 int mdd_attr_check_set_internal(const struct lu_env *env,
1039 struct mdd_object *obj,
1040 struct lu_attr *attr,
1041 struct thandle *handle,
1047 rc = mdd_attr_check(env, obj, attr);
1052 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1056 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1057 struct mdd_object *obj,
1058 struct lu_attr *attr,
1059 struct thandle *handle,
1065 needacl = needacl && (attr->la_valid & LA_MODE);
1067 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1068 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1070 mdd_write_unlock(env, obj);
1074 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1075 struct mdd_object *obj,
1076 struct lu_attr *attr,
1077 struct thandle *handle,
1083 needacl = needacl && (attr->la_valid & LA_MODE);
1085 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1086 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1088 mdd_write_unlock(env, obj);
1092 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1093 const struct lu_buf *buf, const char *name,
1094 int fl, struct thandle *handle)
1096 struct lustre_capa *capa = mdd_object_capa(env, obj);
1100 if (buf->lb_buf && buf->lb_len > 0)
1101 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1102 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1103 rc = mdo_xattr_del(env, obj, name, handle, capa);
1109 * This gives the same functionality as the code between
1110 * sys_chmod and inode_setattr
1111 * chown_common and inode_setattr
1112 * utimes and inode_setattr
1113 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1115 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1116 struct lu_attr *la, const struct md_attr *ma)
1118 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1119 struct md_ucred *uc;
1126 /* Do not permit change file type */
1127 if (la->la_valid & LA_TYPE)
1130 /* They should not be processed by setattr */
1131 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1134 /* export destroy does not have ->le_ses, but we may want
1135 * to drop LUSTRE_SOM_FL. */
1141 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1145 if (la->la_valid == LA_CTIME) {
1146 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1147 /* This is only for set ctime when rename's source is
1149 rc = mdd_may_delete(env, NULL, obj,
1150 (struct md_attr *)ma, 1, 0);
1151 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1152 la->la_valid &= ~LA_CTIME;
1156 if (la->la_valid == LA_ATIME) {
1157 /* This is atime only set for read atime update on close. */
1158 if (la->la_atime >= tmp_la->la_atime &&
1159 la->la_atime < (tmp_la->la_atime +
1160 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1161 la->la_valid &= ~LA_ATIME;
1165 /* Check if flags change. */
1166 if (la->la_valid & LA_FLAGS) {
1167 unsigned int oldflags = 0;
1168 unsigned int newflags = la->la_flags &
1169 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1171 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1172 !mdd_capable(uc, CFS_CAP_FOWNER))
1175 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1176 * only be changed by the relevant capability. */
1177 if (mdd_is_immutable(obj))
1178 oldflags |= LUSTRE_IMMUTABLE_FL;
1179 if (mdd_is_append(obj))
1180 oldflags |= LUSTRE_APPEND_FL;
1181 if ((oldflags ^ newflags) &&
1182 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1185 if (!S_ISDIR(tmp_la->la_mode))
1186 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1189 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1190 (la->la_valid & ~LA_FLAGS) &&
1191 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1194 /* Check for setting the obj time. */
1195 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1196 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1197 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1198 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1199 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1207 if (la->la_valid & LA_KILL_SUID) {
1208 la->la_valid &= ~LA_KILL_SUID;
1209 if ((tmp_la->la_mode & S_ISUID) &&
1210 !(la->la_valid & LA_MODE)) {
1211 la->la_mode = tmp_la->la_mode;
1212 la->la_valid |= LA_MODE;
1214 la->la_mode &= ~S_ISUID;
1217 if (la->la_valid & LA_KILL_SGID) {
1218 la->la_valid &= ~LA_KILL_SGID;
1219 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1220 (S_ISGID | S_IXGRP)) &&
1221 !(la->la_valid & LA_MODE)) {
1222 la->la_mode = tmp_la->la_mode;
1223 la->la_valid |= LA_MODE;
1225 la->la_mode &= ~S_ISGID;
1228 /* Make sure a caller can chmod. */
1229 if (la->la_valid & LA_MODE) {
1230 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1231 (uc->mu_fsuid != tmp_la->la_uid) &&
1232 !mdd_capable(uc, CFS_CAP_FOWNER))
1235 if (la->la_mode == (cfs_umode_t) -1)
1236 la->la_mode = tmp_la->la_mode;
1238 la->la_mode = (la->la_mode & S_IALLUGO) |
1239 (tmp_la->la_mode & ~S_IALLUGO);
1241 /* Also check the setgid bit! */
1242 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1243 la->la_gid : tmp_la->la_gid) &&
1244 !mdd_capable(uc, CFS_CAP_FSETID))
1245 la->la_mode &= ~S_ISGID;
1247 la->la_mode = tmp_la->la_mode;
1250 /* Make sure a caller can chown. */
1251 if (la->la_valid & LA_UID) {
1252 if (la->la_uid == (uid_t) -1)
1253 la->la_uid = tmp_la->la_uid;
1254 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1255 (la->la_uid != tmp_la->la_uid)) &&
1256 !mdd_capable(uc, CFS_CAP_CHOWN))
1259 /* If the user or group of a non-directory has been
1260 * changed by a non-root user, remove the setuid bit.
1261 * 19981026 David C Niemi <niemi@tux.org>
1263 * Changed this to apply to all users, including root,
1264 * to avoid some races. This is the behavior we had in
1265 * 2.0. The check for non-root was definitely wrong
1266 * for 2.2 anyway, as it should have been using
1267 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1268 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1269 !S_ISDIR(tmp_la->la_mode)) {
1270 la->la_mode &= ~S_ISUID;
1271 la->la_valid |= LA_MODE;
1275 /* Make sure caller can chgrp. */
1276 if (la->la_valid & LA_GID) {
1277 if (la->la_gid == (gid_t) -1)
1278 la->la_gid = tmp_la->la_gid;
1279 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1280 ((la->la_gid != tmp_la->la_gid) &&
1281 !lustre_in_group_p(uc, la->la_gid))) &&
1282 !mdd_capable(uc, CFS_CAP_CHOWN))
1285 /* Likewise, if the user or group of a non-directory
1286 * has been changed by a non-root user, remove the
1287 * setgid bit UNLESS there is no group execute bit
1288 * (this would be a file marked for mandatory
1289 * locking). 19981026 David C Niemi <niemi@tux.org>
1291 * Removed the fsuid check (see the comment above) --
1293 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1294 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1295 la->la_mode &= ~S_ISGID;
1296 la->la_valid |= LA_MODE;
1300 /* For both Size-on-MDS case and truncate case,
1301 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1302 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1303 * For SOM case, it is true, the MAY_WRITE perm has been checked
1304 * when open, no need check again. For truncate case, it is false,
1305 * the MAY_WRITE perm should be checked here. */
1306 if (ma->ma_attr_flags & MDS_SOM) {
1307 /* For the "Size-on-MDS" setattr update, merge coming
1308 * attributes with the set in the inode. BUG 10641 */
1309 if ((la->la_valid & LA_ATIME) &&
1310 (la->la_atime <= tmp_la->la_atime))
1311 la->la_valid &= ~LA_ATIME;
1313 /* OST attributes do not have a priority over MDS attributes,
1314 * so drop times if ctime is equal. */
1315 if ((la->la_valid & LA_CTIME) &&
1316 (la->la_ctime <= tmp_la->la_ctime))
1317 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1319 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1320 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1321 (uc->mu_fsuid == tmp_la->la_uid)) &&
1322 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1323 rc = mdd_permission_internal_locked(env, obj,
1330 if (la->la_valid & LA_CTIME) {
1331 /* The pure setattr, it has the priority over what is
1332 * already set, do not drop it if ctime is equal. */
1333 if (la->la_ctime < tmp_la->la_ctime)
1334 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1342 /** Store a data change changelog record
1343 * If this fails, we must fail the whole transaction; we don't
1344 * want the change to commit without the log entry.
1345 * \param mdd_obj - mdd_object of change
1346 * \param handle - transacion handle
1348 static int mdd_changelog_data_store(const struct lu_env *env,
1349 struct mdd_device *mdd,
1350 enum changelog_rec_type type,
1352 struct mdd_object *mdd_obj,
1353 struct thandle *handle)
1355 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1356 struct llog_changelog_rec *rec;
1357 struct thandle *th = NULL;
1363 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1365 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1368 LASSERT(mdd_obj != NULL);
1369 LASSERT(handle != NULL);
1371 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1372 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1373 /* Don't need multiple updates in this log */
1374 /* Don't check under lock - no big deal if we get an extra
1379 reclen = llog_data_len(sizeof(*rec));
1380 buf = mdd_buf_alloc(env, reclen);
1381 if (buf->lb_buf == NULL)
1383 rec = (struct llog_changelog_rec *)buf->lb_buf;
1385 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1386 rec->cr.cr_type = (__u32)type;
1387 rec->cr.cr_tfid = *tfid;
1388 rec->cr.cr_namelen = 0;
1389 mdd_obj->mod_cltime = cfs_time_current_64();
1391 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1394 mdd_trans_stop(env, mdd, rc, th);
1397 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1398 rc, type, PFID(tfid));
1405 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1406 int flags, struct md_object *obj)
1408 struct thandle *handle;
1409 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1410 struct mdd_device *mdd = mdo2mdd(obj);
1414 handle = mdd_trans_create(env, mdd);
1416 return(PTR_ERR(handle));
1418 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1422 rc = mdd_trans_start(env, mdd, handle);
1426 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1430 mdd_trans_stop(env, mdd, rc, handle);
1436 * Should be called with write lock held.
1438 * \see mdd_lma_set_locked().
1440 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1441 const struct md_attr *ma, struct thandle *handle)
1443 struct mdd_thread_info *info = mdd_env_info(env);
1445 struct lustre_mdt_attrs *lma =
1446 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1447 int lmasize = sizeof(struct lustre_mdt_attrs);
1452 /* Either HSM or SOM part is not valid, we need to read it before */
1453 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1454 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1458 lustre_lma_swab(lma);
1460 memset(lma, 0, lmasize);
1464 if (ma->ma_valid & MA_HSM) {
1465 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1466 lma->lma_compat |= LMAC_HSM;
1470 if (ma->ma_valid & MA_SOM) {
1471 LASSERT(ma->ma_som != NULL);
1472 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1473 lma->lma_compat &= ~LMAC_SOM;
1475 lma->lma_compat |= LMAC_SOM;
1476 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1477 lma->lma_som_size = ma->ma_som->msd_size;
1478 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1479 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1484 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1486 lustre_lma_swab(lma);
1487 buf = mdd_buf_get(env, lma, lmasize);
1488 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1494 * Save LMA extended attributes with data from \a ma.
1496 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1497 * not, LMA EA will be first read from disk, modified and write back.
1500 static int mdd_lma_set_locked(const struct lu_env *env,
1501 struct mdd_object *mdd_obj,
1502 const struct md_attr *ma, struct thandle *handle)
1506 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1507 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1508 mdd_write_unlock(env, mdd_obj);
1512 /* Precedence for choosing record type when multiple
1513 * attributes change: setattr > mtime > ctime > atime
1514 * (ctime changes when mtime does, plus chmod/chown.
1515 * atime and ctime are independent.) */
1516 static int mdd_attr_set_changelog(const struct lu_env *env,
1517 struct md_object *obj, struct thandle *handle,
1520 struct mdd_device *mdd = mdo2mdd(obj);
1523 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1524 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1525 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1526 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1527 bits = bits & mdd->mdd_cl.mc_mask;
1531 /* The record type is the lowest non-masked set bit */
1532 while (bits && ((bits & 1) == 0)) {
1537 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1538 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1539 md2mdd_obj(obj), handle);
1542 static int mdd_declare_attr_set(const struct lu_env *env,
1543 struct mdd_device *mdd,
1544 struct mdd_object *obj,
1545 const struct md_attr *ma,
1546 struct lov_mds_md *lmm,
1547 struct thandle *handle)
1549 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1552 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1556 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1560 if (ma->ma_valid & MA_LOV) {
1562 buf->lb_len = ma->ma_lmm_size;
1563 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1569 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1571 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1572 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1578 #ifdef CONFIG_FS_POSIX_ACL
1579 if (ma->ma_attr.la_valid & LA_MODE) {
1580 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1581 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1583 mdd_read_unlock(env, obj);
1584 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1592 rc = mdo_declare_xattr_set(env, obj, buf,
1593 XATTR_NAME_ACL_ACCESS, 0,
1601 /* basically the log is the same as in unlink case */
1605 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1606 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1607 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1608 mdd->mdd_obd_dev->obd_name,
1609 le32_to_cpu(lmm->lmm_magic),
1610 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1614 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1615 if (stripe == LOV_ALL_STRIPES) {
1616 struct lov_desc *ldesc;
1618 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1619 LASSERT(ldesc != NULL);
1620 stripe = ldesc->ld_tgt_count;
1623 for (i = 0; i < stripe; i++) {
1624 rc = mdd_declare_llog_record(env, mdd,
1625 sizeof(struct llog_unlink_rec),
1635 /* set attr and LOV EA at once, return updated attr */
1636 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1637 const struct md_attr *ma)
1639 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1640 struct mdd_device *mdd = mdo2mdd(obj);
1641 struct thandle *handle;
1642 struct lov_mds_md *lmm = NULL;
1643 struct llog_cookie *logcookies = NULL;
1644 int rc, lmm_size = 0, cookie_size = 0;
1645 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1646 #ifdef HAVE_QUOTA_SUPPORT
1647 struct obd_device *obd = mdd->mdd_obd_dev;
1648 struct mds_obd *mds = &obd->u.mds;
1649 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1650 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1651 int quota_opc = 0, block_count = 0;
1652 int inode_pending[MAXQUOTAS] = { 0, 0 };
1653 int block_pending[MAXQUOTAS] = { 0, 0 };
1657 *la_copy = ma->ma_attr;
1658 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1662 /* setattr on "close" only change atime, or do nothing */
1663 if (ma->ma_valid == MA_INODE &&
1664 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1667 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1668 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1669 lmm_size = mdd_lov_mdsize(env, mdd);
1670 lmm = mdd_max_lmm_get(env, mdd);
1674 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1681 handle = mdd_trans_create(env, mdd);
1683 RETURN(PTR_ERR(handle));
1685 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1686 lmm_size > 0 ? lmm : NULL, handle);
1690 rc = mdd_trans_start(env, mdd, handle);
1694 /* permission changes may require sync operation */
1695 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1696 handle->th_sync |= !!mdd->mdd_sync_permission;
1698 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1699 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1700 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1702 #ifdef HAVE_QUOTA_SUPPORT
1703 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1704 struct obd_export *exp = md_quota(env)->mq_exp;
1705 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1707 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1709 quota_opc = FSFILT_OP_SETATTR;
1710 mdd_quota_wrapper(la_copy, qnids);
1711 mdd_quota_wrapper(la_tmp, qoids);
1712 /* get file quota for new owner */
1713 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1714 qnids, inode_pending, 1, NULL, 0,
1716 block_count = (la_tmp->la_blocks + 7) >> 3;
1719 mdd_data_get(env, mdd_obj, &data);
1720 /* get block quota for new owner */
1721 lquota_chkquota(mds_quota_interface_ref, obd,
1722 exp, qnids, block_pending,
1724 LQUOTA_FLAGS_BLK, data, 1);
1730 if (la_copy->la_valid & LA_FLAGS) {
1731 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1734 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1735 } else if (la_copy->la_valid) { /* setattr */
1736 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1738 /* journal chown/chgrp in llog, just like unlink */
1739 if (rc == 0 && lmm_size){
1740 cookie_size = mdd_lov_cookiesize(env, mdd);
1741 logcookies = mdd_max_cookie_get(env, mdd);
1742 if (logcookies == NULL)
1743 GOTO(cleanup, rc = -ENOMEM);
1745 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1746 logcookies, cookie_size) <= 0)
1751 if (rc == 0 && ma->ma_valid & MA_LOV) {
1754 mode = mdd_object_type(mdd_obj);
1755 if (S_ISREG(mode) || S_ISDIR(mode)) {
1756 rc = mdd_lsm_sanity_check(env, mdd_obj);
1760 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1761 ma->ma_lmm_size, handle, 1);
1765 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1768 mode = mdd_object_type(mdd_obj);
1770 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1775 rc = mdd_attr_set_changelog(env, obj, handle,
1776 ma->ma_attr.la_valid);
1778 mdd_trans_stop(env, mdd, rc, handle);
1779 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1780 /*set obd attr, if needed*/
1781 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1784 #ifdef HAVE_QUOTA_SUPPORT
1786 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1788 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1790 /* Trigger dqrel/dqacq for original owner and new owner.
1791 * If failed, the next call for lquota_chkquota will
1793 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1800 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1801 const struct lu_buf *buf, const char *name, int fl,
1802 struct thandle *handle)
1807 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1808 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1809 mdd_write_unlock(env, obj);
1814 static int mdd_xattr_sanity_check(const struct lu_env *env,
1815 struct mdd_object *obj)
1817 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1818 struct md_ucred *uc = md_ucred(env);
1822 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1825 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1829 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1830 !mdd_capable(uc, CFS_CAP_FOWNER))
1836 static int mdd_declare_xattr_set(const struct lu_env *env,
1837 struct mdd_device *mdd,
1838 struct mdd_object *obj,
1839 const struct lu_buf *buf,
1841 struct thandle *handle)
1846 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1850 /* Only record user xattr changes */
1851 if ((strncmp("user.", name, 5) == 0))
1852 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1858 * The caller should guarantee to update the object ctime
1859 * after xattr_set if needed.
1861 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1862 const struct lu_buf *buf, const char *name,
1865 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1866 struct mdd_device *mdd = mdo2mdd(obj);
1867 struct thandle *handle;
1871 rc = mdd_xattr_sanity_check(env, mdd_obj);
1875 handle = mdd_trans_create(env, mdd);
1877 RETURN(PTR_ERR(handle));
1879 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1883 rc = mdd_trans_start(env, mdd, handle);
1887 /* security-replated changes may require sync */
1888 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1889 handle->th_sync |= !!mdd->mdd_sync_permission;
1891 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1893 /* Only record system & user xattr changes */
1894 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1895 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1896 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1897 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1898 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1899 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1900 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1904 mdd_trans_stop(env, mdd, rc, handle);
1909 static int mdd_declare_xattr_del(const struct lu_env *env,
1910 struct mdd_device *mdd,
1911 struct mdd_object *obj,
1913 struct thandle *handle)
1917 rc = mdo_declare_xattr_del(env, obj, name, handle);
1921 /* Only record user xattr changes */
1922 if ((strncmp("user.", name, 5) == 0))
1923 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1929 * The caller should guarantee to update the object ctime
1930 * after xattr_set if needed.
1932 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1935 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1936 struct mdd_device *mdd = mdo2mdd(obj);
1937 struct thandle *handle;
1941 rc = mdd_xattr_sanity_check(env, mdd_obj);
1945 handle = mdd_trans_create(env, mdd);
1947 RETURN(PTR_ERR(handle));
1949 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1953 rc = mdd_trans_start(env, mdd, handle);
1957 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1958 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1959 mdd_object_capa(env, mdd_obj));
1960 mdd_write_unlock(env, mdd_obj);
1962 /* Only record system & user xattr changes */
1963 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1964 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1965 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1966 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1967 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1968 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1969 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1973 mdd_trans_stop(env, mdd, rc, handle);
1978 /* partial unlink */
1979 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1982 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1983 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1984 struct mdd_device *mdd = mdo2mdd(obj);
1985 struct thandle *handle;
1986 #ifdef HAVE_QUOTA_SUPPORT
1987 struct obd_device *obd = mdd->mdd_obd_dev;
1988 struct mds_obd *mds = &obd->u.mds;
1989 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1995 /* XXX: this code won't be used ever:
1996 * DNE uses slightly different approach */
2000 * Check -ENOENT early here because we need to get object type
2001 * to calculate credits before transaction start
2003 if (mdd_object_exists(mdd_obj) == 0) {
2004 CERROR("%s: object "DFID" not found: rc = -2\n",
2005 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2009 LASSERT(mdd_object_exists(mdd_obj) > 0);
2011 handle = mdd_trans_create(env, mdd);
2015 rc = mdd_trans_start(env, mdd, handle);
2017 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2019 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2023 mdo_ref_del(env, mdd_obj, handle);
2025 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2027 mdo_ref_del(env, mdd_obj, handle);
2030 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2031 la_copy->la_ctime = ma->ma_attr.la_ctime;
2033 la_copy->la_valid = LA_CTIME;
2034 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2038 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2039 #ifdef HAVE_QUOTA_SUPPORT
2040 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2041 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2042 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2043 mdd_quota_wrapper(&ma->ma_attr, qids);
2050 mdd_write_unlock(env, mdd_obj);
2051 mdd_trans_stop(env, mdd, rc, handle);
2052 #ifdef HAVE_QUOTA_SUPPORT
2054 /* Trigger dqrel on the owner of child. If failed,
2055 * the next call for lquota_chkquota will process it */
2056 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2062 /* partial operation */
2063 static int mdd_oc_sanity_check(const struct lu_env *env,
2064 struct mdd_object *obj,
2070 switch (ma->ma_attr.la_mode & S_IFMT) {
2087 static int mdd_object_create(const struct lu_env *env,
2088 struct md_object *obj,
2089 const struct md_op_spec *spec,
2093 struct mdd_device *mdd = mdo2mdd(obj);
2094 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2095 const struct lu_fid *pfid = spec->u.sp_pfid;
2096 struct thandle *handle;
2097 #ifdef HAVE_QUOTA_SUPPORT
2098 struct obd_device *obd = mdd->mdd_obd_dev;
2099 struct obd_export *exp = md_quota(env)->mq_exp;
2100 struct mds_obd *mds = &obd->u.mds;
2101 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2102 int quota_opc = 0, block_count = 0;
2103 int inode_pending[MAXQUOTAS] = { 0, 0 };
2104 int block_pending[MAXQUOTAS] = { 0, 0 };
2109 /* XXX: this code won't be used ever:
2110 * DNE uses slightly different approach */
2113 #ifdef HAVE_QUOTA_SUPPORT
2114 if (mds->mds_quota) {
2115 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2116 mdd_quota_wrapper(&ma->ma_attr, qids);
2117 /* get file quota for child */
2118 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2119 qids, inode_pending, 1, NULL, 0,
2121 switch (ma->ma_attr.la_mode & S_IFMT) {
2130 /* get block quota for child */
2132 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2133 qids, block_pending, block_count,
2134 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2138 handle = mdd_trans_create(env, mdd);
2140 GOTO(out_pending, rc = PTR_ERR(handle));
2142 rc = mdd_trans_start(env, mdd, handle);
2144 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2145 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2149 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2153 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2154 /* If creating the slave object, set slave EA here. */
2155 int lmv_size = spec->u.sp_ea.eadatalen;
2156 struct lmv_stripe_md *lmv;
2158 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2159 LASSERT(lmv != NULL && lmv_size > 0);
2161 rc = __mdd_xattr_set(env, mdd_obj,
2162 mdd_buf_get_const(env, lmv, lmv_size),
2163 XATTR_NAME_LMV, 0, handle);
2167 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2170 #ifdef CONFIG_FS_POSIX_ACL
2171 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2172 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2174 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2175 buf->lb_len = spec->u.sp_ea.eadatalen;
2176 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2177 rc = __mdd_acl_init(env, mdd_obj, buf,
2178 &ma->ma_attr.la_mode,
2183 ma->ma_attr.la_valid |= LA_MODE;
2186 pfid = spec->u.sp_ea.fid;
2189 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2195 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2196 mdd_write_unlock(env, mdd_obj);
2198 mdd_trans_stop(env, mdd, rc, handle);
2200 #ifdef HAVE_QUOTA_SUPPORT
2202 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2204 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2206 /* Trigger dqacq on the owner of child. If failed,
2207 * the next call for lquota_chkquota will process it. */
2208 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2216 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2217 const struct md_attr *ma)
2219 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2220 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2221 struct mdd_device *mdd = mdo2mdd(obj);
2222 struct thandle *handle;
2226 /* XXX: this code won't be used ever:
2227 * DNE uses slightly different approach */
2230 handle = mdd_trans_create(env, mdd);
2234 rc = mdd_trans_start(env, mdd, handle);
2236 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2237 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2239 mdo_ref_add(env, mdd_obj, handle);
2240 mdd_write_unlock(env, mdd_obj);
2242 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2243 la_copy->la_ctime = ma->ma_attr.la_ctime;
2245 la_copy->la_valid = LA_CTIME;
2246 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2249 mdd_trans_stop(env, mdd, 0, handle);
2255 * do NOT or the MAY_*'s, you'll get the weakest
2257 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2261 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2262 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2263 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2264 * owner can write to a file even if it is marked readonly to hide
2265 * its brokenness. (bug 5781) */
2266 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2267 struct md_ucred *uc = md_ucred(env);
2269 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2270 (la->la_uid == uc->mu_fsuid))
2274 if (flags & FMODE_READ)
2276 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2278 if (flags & MDS_FMODE_EXEC)
2283 static int mdd_open_sanity_check(const struct lu_env *env,
2284 struct mdd_object *obj, int flag)
2286 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2291 if (mdd_is_dead_obj(obj))
2294 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2298 if (S_ISLNK(tmp_la->la_mode))
2301 mode = accmode(env, tmp_la, flag);
2303 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2306 if (!(flag & MDS_OPEN_CREATED)) {
2307 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2312 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2313 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2314 flag &= ~MDS_OPEN_TRUNC;
2316 /* For writing append-only file must open it with append mode. */
2317 if (mdd_is_append(obj)) {
2318 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2320 if (flag & MDS_OPEN_TRUNC)
2326 * Now, flag -- O_NOATIME does not be packed by client.
2328 if (flag & O_NOATIME) {
2329 struct md_ucred *uc = md_ucred(env);
2331 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2332 (uc->mu_valid == UCRED_NEW)) &&
2333 (uc->mu_fsuid != tmp_la->la_uid) &&
2334 !mdd_capable(uc, CFS_CAP_FOWNER))
2342 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2345 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2348 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2350 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2352 mdd_obj->mod_count++;
2354 mdd_write_unlock(env, mdd_obj);
2358 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2359 struct md_attr *ma, struct thandle *handle)
2363 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2367 return mdo_declare_destroy(env, obj, handle);
2370 /* return md_attr back,
2371 * if it is last unlink then return lov ea + llog cookie*/
2372 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2373 struct md_attr *ma, struct thandle *handle)
2378 if (S_ISREG(mdd_object_type(obj))) {
2379 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2380 * Caller must be ready for that. */
2381 rc = __mdd_lmm_get(env, obj, ma);
2382 if ((ma->ma_valid & MA_LOV))
2383 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2388 rc = mdo_destroy(env, obj, handle);
2393 static int mdd_declare_close(const struct lu_env *env,
2394 struct mdd_object *obj,
2396 struct thandle *handle)
2400 rc = orph_declare_index_delete(env, obj, handle);
2404 return mdd_declare_object_kill(env, obj, ma, handle);
2408 * No permission check is needed.
2410 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2411 struct md_attr *ma, int mode)
2413 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2414 struct mdd_device *mdd = mdo2mdd(obj);
2415 struct thandle *handle = NULL;
2417 int is_orphan = 0, reset = 1;
2419 #ifdef HAVE_QUOTA_SUPPORT
2420 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2421 struct mds_obd *mds = &obd->u.mds;
2422 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2427 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2428 mdd_obj->mod_count--;
2430 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2431 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2432 "list\n", PFID(mdd_object_fid(mdd_obj)));
2436 /* check without any lock */
2437 if (mdd_obj->mod_count == 1 &&
2438 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2440 handle = mdd_trans_create(env, mdo2mdd(obj));
2442 RETURN(PTR_ERR(handle));
2444 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2448 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2452 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2457 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2458 if (handle == NULL && mdd_obj->mod_count == 1 &&
2459 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2460 mdd_write_unlock(env, mdd_obj);
2464 /* release open count */
2465 mdd_obj->mod_count --;
2467 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2468 /* remove link to object from orphan index */
2469 LASSERT(handle != NULL);
2470 rc = __mdd_orphan_del(env, mdd_obj, handle);
2472 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2473 "list, OSS objects to be destroyed.\n",
2474 PFID(mdd_object_fid(mdd_obj)));
2477 CERROR("Object "DFID" can not be deleted from orphan "
2478 "list, maybe cause OST objects can not be "
2479 "destroyed (err: %d).\n",
2480 PFID(mdd_object_fid(mdd_obj)), rc);
2481 /* If object was not deleted from orphan list, do not
2482 * destroy OSS objects, which will be done when next
2488 rc = mdd_iattr_get(env, mdd_obj, ma);
2489 /* Object maybe not in orphan list originally, it is rare case for
2490 * mdd_finish_unlink() failure. */
2491 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2492 #ifdef HAVE_QUOTA_SUPPORT
2493 if (mds->mds_quota) {
2494 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2495 mdd_quota_wrapper(&ma->ma_attr, qids);
2498 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2499 if (ma->ma_valid & MA_FLAGS &&
2500 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2501 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2503 if (handle == NULL) {
2504 handle = mdd_trans_create(env, mdo2mdd(obj));
2506 GOTO(out, rc = PTR_ERR(handle));
2508 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2513 rc = mdd_declare_changelog_store(env, mdd,
2518 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2523 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2529 CERROR("Error when prepare to delete Object "DFID" , "
2530 "which will cause OST objects can not be "
2531 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2537 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2539 mdd_write_unlock(env, mdd_obj);
2542 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2543 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2544 if (handle == NULL) {
2545 handle = mdd_trans_create(env, mdo2mdd(obj));
2547 GOTO(stop, rc = IS_ERR(handle));
2549 rc = mdd_declare_changelog_store(env, mdd, NULL,
2554 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2559 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2565 mdd_trans_stop(env, mdd, rc, handle);
2566 #ifdef HAVE_QUOTA_SUPPORT
2568 /* Trigger dqrel on the owner of child. If failed,
2569 * the next call for lquota_chkquota will process it */
2570 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2577 * Permission check is done when open,
2578 * no need check again.
2580 static int mdd_readpage_sanity_check(const struct lu_env *env,
2581 struct mdd_object *obj)
2583 struct dt_object *next = mdd_object_child(obj);
2587 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2595 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2596 struct lu_dirpage *dp, int nob,
2597 const struct dt_it_ops *iops, struct dt_it *it,
2603 struct lu_dirent *ent;
2604 struct lu_dirent *last = NULL;
2607 memset(area, 0, sizeof (*dp));
2608 area += sizeof (*dp);
2609 nob -= sizeof (*dp);
2616 len = iops->key_size(env, it);
2618 /* IAM iterator can return record with zero len. */
2622 hash = iops->store(env, it);
2623 if (unlikely(first)) {
2625 dp->ldp_hash_start = cpu_to_le64(hash);
2628 /* calculate max space required for lu_dirent */
2629 recsize = lu_dirent_calc_size(len, attr);
2631 if (nob >= recsize) {
2632 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2633 if (result == -ESTALE)
2638 /* osd might not able to pack all attributes,
2639 * so recheck rec length */
2640 recsize = le16_to_cpu(ent->lde_reclen);
2642 result = (last != NULL) ? 0 :-EINVAL;
2646 ent = (void *)ent + recsize;
2650 result = iops->next(env, it);
2651 if (result == -ESTALE)
2653 } while (result == 0);
2656 dp->ldp_hash_end = cpu_to_le64(hash);
2658 if (last->lde_hash == dp->ldp_hash_end)
2659 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2660 last->lde_reclen = 0; /* end mark */
2665 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2666 const struct lu_rdpg *rdpg)
2669 struct dt_object *next = mdd_object_child(obj);
2670 const struct dt_it_ops *iops;
2672 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2678 LASSERT(rdpg->rp_pages != NULL);
2679 LASSERT(next->do_index_ops != NULL);
2681 if (rdpg->rp_count <= 0)
2685 * iterate through directory and fill pages from @rdpg
2687 iops = &next->do_index_ops->dio_it;
2688 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2692 rc = iops->load(env, it, rdpg->rp_hash);
2696 * Iterator didn't find record with exactly the key requested.
2698 * It is currently either
2700 * - positioned above record with key less than
2701 * requested---skip it.
2703 * - or not positioned at all (is in IAM_IT_SKEWED
2704 * state)---position it on the next item.
2706 rc = iops->next(env, it);
2711 * At this point and across for-loop:
2713 * rc == 0 -> ok, proceed.
2714 * rc > 0 -> end of directory.
2717 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2718 i++, nob -= CFS_PAGE_SIZE) {
2719 struct lu_dirpage *dp;
2721 LASSERT(i < rdpg->rp_npages);
2722 pg = rdpg->rp_pages[i];
2724 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2727 rc = mdd_dir_page_build(env, mdd, dp,
2728 min_t(int, nob, LU_PAGE_SIZE),
2729 iops, it, rdpg->rp_attrs);
2734 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2736 } else if (rc < 0) {
2737 CWARN("build page failed: %d!\n", rc);
2740 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2741 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2742 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2749 struct lu_dirpage *dp;
2751 dp = cfs_kmap(rdpg->rp_pages[0]);
2752 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2755 * No pages were processed, mark this for first page
2758 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2761 cfs_kunmap(rdpg->rp_pages[0]);
2763 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2766 iops->fini(env, it);
2771 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2772 const struct lu_rdpg *rdpg)
2774 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2778 if (mdd_object_exists(mdd_obj) == 0) {
2779 CERROR("%s: object "DFID" not found: rc = -2\n",
2780 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2784 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2785 rc = mdd_readpage_sanity_check(env, mdd_obj);
2787 GOTO(out_unlock, rc);
2789 if (mdd_is_dead_obj(mdd_obj)) {
2791 struct lu_dirpage *dp;
2794 * According to POSIX, please do not return any entry to client:
2795 * even dot and dotdot should not be returned.
2797 CWARN("readdir from dead object: "DFID"\n",
2798 PFID(mdd_object_fid(mdd_obj)));
2800 if (rdpg->rp_count <= 0)
2801 GOTO(out_unlock, rc = -EFAULT);
2802 LASSERT(rdpg->rp_pages != NULL);
2804 pg = rdpg->rp_pages[0];
2805 dp = (struct lu_dirpage*)cfs_kmap(pg);
2806 memset(dp, 0 , sizeof(struct lu_dirpage));
2807 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2808 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2809 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2811 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2814 rc = __mdd_readpage(env, mdd_obj, rdpg);
2818 mdd_read_unlock(env, mdd_obj);
2822 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2824 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2826 if (mdd_object_exists(mdd_obj) == 0) {
2827 CERROR("%s: object "DFID" not found: rc = -2\n",
2828 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2831 return dt_object_sync(env, mdd_object_child(mdd_obj));
2834 const struct md_object_operations mdd_obj_ops = {
2835 .moo_permission = mdd_permission,
2836 .moo_attr_get = mdd_attr_get,
2837 .moo_attr_set = mdd_attr_set,
2838 .moo_xattr_get = mdd_xattr_get,
2839 .moo_xattr_set = mdd_xattr_set,
2840 .moo_xattr_list = mdd_xattr_list,
2841 .moo_xattr_del = mdd_xattr_del,
2842 .moo_object_create = mdd_object_create,
2843 .moo_ref_add = mdd_ref_add,
2844 .moo_ref_del = mdd_ref_del,
2845 .moo_open = mdd_open,
2846 .moo_close = mdd_close,
2847 .moo_readpage = mdd_readpage,
2848 .moo_readlink = mdd_readlink,
2849 .moo_changelog = mdd_changelog,
2850 .moo_capa_get = mdd_capa_get,
2851 .moo_object_sync = mdd_object_sync,
2852 .moo_path = mdd_path,
2853 .moo_file_lock = mdd_file_lock,
2854 .moo_file_unlock = mdd_file_unlock,