1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
64 #include "mdd_internal.h"
66 static const struct lu_object_operations mdd_lu_obj_ops;
68 static int mdd_xattr_get(const struct lu_env *env,
69 struct md_object *obj, struct lu_buf *buf,
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
75 if (mdd_object_exists(obj) == 0) {
76 CERROR("%s: object "DFID" not found: rc = -2\n",
77 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
80 mdo_data_get(env, obj, data);
84 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
85 struct lu_attr *la, struct lustre_capa *capa)
87 if (mdd_object_exists(obj) == 0) {
88 CERROR("%s: object "DFID" not found: rc = -2\n",
89 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
92 return mdo_attr_get(env, obj, la, capa);
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
97 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
99 if (flags & LUSTRE_APPEND_FL)
100 obj->mod_flags |= APPEND_OBJ;
102 if (flags & LUSTRE_IMMUTABLE_FL)
103 obj->mod_flags |= IMMUTE_OBJ;
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
108 struct mdd_thread_info *info;
110 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111 LASSERT(info != NULL);
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 buf = &mdd_env_info(env)->mti_buf;
125 void mdd_buf_put(struct lu_buf *buf)
127 if (buf == NULL || buf->lb_buf == NULL)
129 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135 const void *area, ssize_t len)
139 buf = &mdd_env_info(env)->mti_buf;
140 buf->lb_buf = (void *)area;
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
147 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
149 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
153 if (buf->lb_buf == NULL) {
155 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156 if (buf->lb_buf == NULL)
162 /** Increase the size of the \a mti_big_buf.
163 * preserves old data in buffer
164 * old buffer remains unchanged on error
165 * \retval 0 or -ENOMEM
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
169 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
172 LASSERT(len >= oldbuf->lb_len);
173 OBD_ALLOC_LARGE(buf.lb_buf, len);
175 if (buf.lb_buf == NULL)
179 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
181 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
183 memcpy(oldbuf, &buf, sizeof(buf));
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189 struct mdd_device *mdd)
191 struct mdd_thread_info *mti = mdd_env_info(env);
194 max_cookie_size = mdd_lov_cookiesize(env, mdd);
195 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196 if (mti->mti_max_cookie)
197 OBD_FREE_LARGE(mti->mti_max_cookie,
198 mti->mti_max_cookie_size);
199 mti->mti_max_cookie = NULL;
200 mti->mti_max_cookie_size = 0;
202 if (unlikely(mti->mti_max_cookie == NULL)) {
203 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204 if (likely(mti->mti_max_cookie != NULL))
205 mti->mti_max_cookie_size = max_cookie_size;
207 if (likely(mti->mti_max_cookie != NULL))
208 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209 return mti->mti_max_cookie;
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213 struct mdd_device *mdd)
215 struct mdd_thread_info *mti = mdd_env_info(env);
218 max_lmm_size = mdd_lov_mdsize(env, mdd);
219 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220 if (mti->mti_max_lmm)
221 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222 mti->mti_max_lmm = NULL;
223 mti->mti_max_lmm_size = 0;
225 if (unlikely(mti->mti_max_lmm == NULL)) {
226 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227 if (likely(mti->mti_max_lmm != NULL))
228 mti->mti_max_lmm_size = max_lmm_size;
230 return mti->mti_max_lmm;
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234 const struct lu_object_header *hdr,
237 struct mdd_object *mdd_obj;
239 OBD_ALLOC_PTR(mdd_obj);
240 if (mdd_obj != NULL) {
243 o = mdd2lu_obj(mdd_obj);
244 lu_object_init(o, NULL, d);
245 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247 mdd_obj->mod_count = 0;
248 o->lo_ops = &mdd_lu_obj_ops;
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256 const struct lu_object_conf *unused)
258 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259 struct mdd_object *mdd_obj = lu2mdd_obj(o);
260 struct lu_object *below;
261 struct lu_device *under;
264 mdd_obj->mod_cltime = 0;
265 under = &d->mdd_child->dd_lu_dev;
266 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267 mdd_pdlock_init(mdd_obj);
271 lu_object_add(o, below);
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
278 if (lu_object_exists(o))
279 return mdd_get_flags(env, lu2mdd_obj(o));
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
286 struct mdd_object *mdd = lu2mdd_obj(o);
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293 lu_printer_t p, const struct lu_object *o)
295 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297 "valid=%x, cltime="LPU64", flags=%lx)",
298 mdd, mdd->mod_count, mdd->mod_valid,
299 mdd->mod_cltime, mdd->mod_flags);
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303 .loo_object_init = mdd_object_init,
304 .loo_object_start = mdd_object_start,
305 .loo_object_free = mdd_object_free,
306 .loo_object_print = mdd_object_print,
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310 struct mdd_device *d,
311 const struct lu_fid *f)
313 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317 const char *path, struct lu_fid *fid)
320 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321 struct mdd_object *obj;
322 struct lu_name *lname = &mdd_env_info(env)->mti_name;
327 /* temp buffer for path element */
328 buf = mdd_buf_alloc(env, PATH_MAX);
329 if (buf->lb_buf == NULL)
332 lname->ln_name = name = buf->lb_buf;
333 lname->ln_namelen = 0;
334 *f = mdd->mdd_root_fid;
341 while (*path != '/' && *path != '\0') {
349 /* find obj corresponding to fid */
350 obj = mdd_object_find(env, mdd, f);
352 GOTO(out, rc = -EREMOTE);
354 GOTO(out, rc = PTR_ERR(obj));
355 /* get child fid from parent and name */
356 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357 mdd_object_put(env, obj);
362 lname->ln_namelen = 0;
371 /** The maximum depth that fid2path() will search.
372 * This is limited only because we want to store the fids for
373 * historical path lookup purposes.
375 #define MAX_PATH_DEPTH 100
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379 __u64 pli_recno; /**< history point */
380 __u64 pli_currec; /**< current record */
381 struct lu_fid pli_fid;
382 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383 struct mdd_object *pli_mdd_obj;
384 char *pli_path; /**< full path */
386 int pli_linkno; /**< which hardlink to follow */
387 int pli_fidcount; /**< number of \a pli_fids */
390 static int mdd_path_current(const struct lu_env *env,
391 struct path_lookup_info *pli)
393 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394 struct mdd_object *mdd_obj;
395 struct lu_buf *buf = NULL;
396 struct link_ea_header *leh;
397 struct link_ea_entry *lee;
398 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
405 ptr = pli->pli_path + pli->pli_pathlen - 1;
408 pli->pli_fidcount = 0;
409 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
411 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412 mdd_obj = mdd_object_find(env, mdd,
413 &pli->pli_fids[pli->pli_fidcount]);
415 GOTO(out, rc = -EREMOTE);
417 GOTO(out, rc = PTR_ERR(mdd_obj));
418 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
420 mdd_object_put(env, mdd_obj);
424 /* Do I need to error out here? */
429 /* Get parent fid and object name */
430 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431 buf = mdd_links_get(env, mdd_obj);
432 mdd_read_unlock(env, mdd_obj);
433 mdd_object_put(env, mdd_obj);
435 GOTO(out, rc = PTR_ERR(buf));
438 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
441 /* If set, use link #linkno for path lookup, otherwise use
442 link #0. Only do this for the final path element. */
443 if ((pli->pli_fidcount == 0) &&
444 (pli->pli_linkno < leh->leh_reccount)) {
446 for (count = 0; count < pli->pli_linkno; count++) {
447 lee = (struct link_ea_entry *)
448 ((char *)lee + reclen);
449 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
451 if (pli->pli_linkno < leh->leh_reccount - 1)
452 /* indicate to user there are more links */
456 /* Pack the name in the end of the buffer */
457 ptr -= tmpname->ln_namelen;
458 if (ptr - 1 <= pli->pli_path)
459 GOTO(out, rc = -EOVERFLOW);
460 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
463 /* Store the parent fid for historic lookup */
464 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465 GOTO(out, rc = -EOVERFLOW);
466 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
469 /* Verify that our path hasn't changed since we started the lookup.
470 Record the current index, and verify the path resolves to the
471 same fid. If it does, then the path is correct as of this index. */
472 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473 pli->pli_currec = mdd->mdd_cl.mc_index;
474 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
477 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478 GOTO (out, rc = -EAGAIN);
480 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483 PFID(&pli->pli_fid));
484 GOTO(out, rc = -EAGAIN);
486 ptr++; /* skip leading / */
487 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492 /* if we vmalloced a large buffer drop it */
498 static int mdd_path_historic(const struct lu_env *env,
499 struct path_lookup_info *pli)
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506 char *path, int pathlen, __u64 *recno, int *linkno)
508 struct path_lookup_info *pli;
516 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
525 pli->pli_mdd_obj = md2mdd_obj(obj);
526 pli->pli_recno = *recno;
527 pli->pli_path = path;
528 pli->pli_pathlen = pathlen;
529 pli->pli_linkno = *linkno;
531 /* Retry multiple times in case file is being moved */
532 while (tries-- && rc == -EAGAIN)
533 rc = mdd_path_current(env, pli);
535 /* For historical path lookup, the current links may not have existed
536 * at "recno" time. We must switch over to earlier links/parents
537 * by using the changelog records. If the earlier parent doesn't
538 * exist, we must search back through the changelog to reconstruct
539 * its parents, then check if it exists, etc.
540 * We may ignore this problem for the initial implementation and
541 * state that an "original" hardlink must still exist for us to find
542 * historic path name. */
543 if (pli->pli_recno != -1) {
544 rc = mdd_path_historic(env, pli);
546 *recno = pli->pli_currec;
547 /* Return next link index to caller */
548 *linkno = pli->pli_linkno;
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
558 struct lu_attr *la = &mdd_env_info(env)->mti_la;
562 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
564 mdd_flags_xlate(obj, la->la_flags);
569 /* get only inode attributes */
570 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576 if (ma->ma_valid & MA_INODE)
579 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
580 mdd_object_capa(env, mdd_obj));
582 ma->ma_valid |= MA_INODE;
586 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
588 struct lov_desc *ldesc;
589 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
590 struct lov_user_md *lum = (struct lov_user_md*)lmm;
596 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
597 LASSERT(ldesc != NULL);
599 lum->lmm_magic = LOV_MAGIC_V1;
600 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
601 lum->lmm_pattern = ldesc->ld_pattern;
602 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
603 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
604 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
606 RETURN(sizeof(*lum));
609 static int is_rootdir(struct mdd_object *mdd_obj)
611 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
612 const struct lu_fid *fid = mdo2fid(mdd_obj);
614 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
617 /* get lov EA only */
618 static int __mdd_lmm_get(const struct lu_env *env,
619 struct mdd_object *mdd_obj, struct md_attr *ma)
624 if (ma->ma_valid & MA_LOV)
627 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
629 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
630 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
632 ma->ma_lmm_size = rc;
633 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
634 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
640 /* get the first parent fid from link EA */
641 static int mdd_pfid_get(const struct lu_env *env,
642 struct mdd_object *mdd_obj, struct md_attr *ma)
645 struct link_ea_header *leh;
646 struct link_ea_entry *lee;
647 struct lu_fid *pfid = &ma->ma_pfid;
650 if (ma->ma_valid & MA_PFID)
653 buf = mdd_links_get(env, mdd_obj);
655 RETURN(PTR_ERR(buf));
658 lee = (struct link_ea_entry *)(leh + 1);
659 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
660 fid_be_to_cpu(pfid, pfid);
661 ma->ma_valid |= MA_PFID;
662 if (buf->lb_len > OBD_ALLOC_BIG)
663 /* if we vmalloced a large buffer drop it */
668 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
674 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
675 rc = __mdd_lmm_get(env, mdd_obj, ma);
676 mdd_read_unlock(env, mdd_obj);
681 static int __mdd_lmv_get(const struct lu_env *env,
682 struct mdd_object *mdd_obj, struct md_attr *ma)
687 if (ma->ma_valid & MA_LMV)
690 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
693 ma->ma_valid |= MA_LMV;
699 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
702 struct mdd_thread_info *info = mdd_env_info(env);
703 struct lustre_mdt_attrs *lma =
704 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
709 /* If all needed data are already valid, nothing to do */
710 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
711 (ma->ma_need & (MA_HSM | MA_SOM)))
714 /* Read LMA from disk EA */
715 lma_size = sizeof(info->mti_xattr_buf);
716 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
720 /* Useless to check LMA incompatibility because this is already done in
721 * osd_ea_fid_get(), and this will fail long before this code is
723 * So, if we are here, LMA is compatible.
726 lustre_lma_swab(lma);
728 /* Swab and copy LMA */
729 if (ma->ma_need & MA_HSM) {
730 if (lma->lma_compat & LMAC_HSM)
731 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
733 ma->ma_hsm.mh_flags = 0;
734 ma->ma_valid |= MA_HSM;
738 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
739 LASSERT(ma->ma_som != NULL);
740 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
741 ma->ma_som->msd_size = lma->lma_som_size;
742 ma->ma_som->msd_blocks = lma->lma_som_blocks;
743 ma->ma_som->msd_mountid = lma->lma_som_mountid;
744 ma->ma_valid |= MA_SOM;
750 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
756 if (ma->ma_need & MA_INODE)
757 rc = mdd_iattr_get(env, mdd_obj, ma);
759 if (rc == 0 && ma->ma_need & MA_LOV) {
760 if (S_ISREG(mdd_object_type(mdd_obj)) ||
761 S_ISDIR(mdd_object_type(mdd_obj)))
762 rc = __mdd_lmm_get(env, mdd_obj, ma);
764 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
765 if (S_ISREG(mdd_object_type(mdd_obj)))
766 rc = mdd_pfid_get(env, mdd_obj, ma);
768 if (rc == 0 && ma->ma_need & MA_LMV) {
769 if (S_ISDIR(mdd_object_type(mdd_obj)))
770 rc = __mdd_lmv_get(env, mdd_obj, ma);
772 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
773 if (S_ISREG(mdd_object_type(mdd_obj)))
774 rc = __mdd_lma_get(env, mdd_obj, ma);
776 #ifdef CONFIG_FS_POSIX_ACL
777 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
778 if (S_ISDIR(mdd_object_type(mdd_obj)))
779 rc = mdd_def_acl_get(env, mdd_obj, ma);
782 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
783 rc, ma->ma_valid, ma->ma_lmm);
787 int mdd_attr_get_internal_locked(const struct lu_env *env,
788 struct mdd_object *mdd_obj, struct md_attr *ma)
791 int needlock = ma->ma_need &
792 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
795 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
796 rc = mdd_attr_get_internal(env, mdd_obj, ma);
798 mdd_read_unlock(env, mdd_obj);
803 * No permission check is needed.
805 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
808 struct mdd_object *mdd_obj = md2mdd_obj(obj);
812 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
817 * No permission check is needed.
819 static int mdd_xattr_get(const struct lu_env *env,
820 struct md_object *obj, struct lu_buf *buf,
823 struct mdd_object *mdd_obj = md2mdd_obj(obj);
828 if (mdd_object_exists(mdd_obj) == 0) {
829 CERROR("%s: object "DFID" not found: rc = -2\n",
830 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
834 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835 rc = mdo_xattr_get(env, mdd_obj, buf, name,
836 mdd_object_capa(env, mdd_obj));
837 mdd_read_unlock(env, mdd_obj);
843 * Permission check is done when open,
844 * no need check again.
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
849 struct mdd_object *mdd_obj = md2mdd_obj(obj);
850 struct dt_object *next;
855 if (mdd_object_exists(mdd_obj) == 0) {
856 CERROR("%s: object "DFID" not found: rc = -2\n",
857 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
861 next = mdd_object_child(mdd_obj);
862 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
863 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
864 mdd_object_capa(env, mdd_obj));
865 mdd_read_unlock(env, mdd_obj);
870 * No permission check is needed.
872 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
875 struct mdd_object *mdd_obj = md2mdd_obj(obj);
880 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
881 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
882 mdd_read_unlock(env, mdd_obj);
887 int mdd_declare_object_create_internal(const struct lu_env *env,
888 struct mdd_object *p,
889 struct mdd_object *c,
891 struct thandle *handle,
892 const struct md_op_spec *spec)
894 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
895 const struct dt_index_features *feat = spec->sp_feat;
899 if (feat != &dt_directory_features && feat != NULL)
900 dof->dof_type = DFT_INDEX;
902 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
904 dof->u.dof_idx.di_feat = feat;
906 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
911 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
912 struct mdd_object *c, struct md_attr *ma,
913 struct thandle *handle,
914 const struct md_op_spec *spec)
916 struct lu_attr *attr = &ma->ma_attr;
917 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
918 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
919 const struct dt_index_features *feat = spec->sp_feat;
923 if (!mdd_object_exists(c)) {
924 struct dt_object *next = mdd_object_child(c);
927 if (feat != &dt_directory_features && feat != NULL)
928 dof->dof_type = DFT_INDEX;
930 dof->dof_type = dt_mode_to_dft(attr->la_mode);
932 dof->u.dof_idx.di_feat = feat;
934 /* @hint will be initialized by underlying device. */
935 next->do_ops->do_ah_init(env, hint,
936 p ? mdd_object_child(p) : NULL,
937 attr->la_mode & S_IFMT);
939 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
940 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
948 * Make sure the ctime is increased only.
950 static inline int mdd_attr_check(const struct lu_env *env,
951 struct mdd_object *obj,
952 struct lu_attr *attr)
954 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
958 if (attr->la_valid & LA_CTIME) {
959 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
963 if (attr->la_ctime < tmp_la->la_ctime)
964 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
965 else if (attr->la_valid == LA_CTIME &&
966 attr->la_ctime == tmp_la->la_ctime)
967 attr->la_valid &= ~LA_CTIME;
972 int mdd_attr_set_internal(const struct lu_env *env,
973 struct mdd_object *obj,
974 struct lu_attr *attr,
975 struct thandle *handle,
981 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
982 #ifdef CONFIG_FS_POSIX_ACL
983 if (!rc && (attr->la_valid & LA_MODE) && needacl)
984 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
989 int mdd_attr_check_set_internal(const struct lu_env *env,
990 struct mdd_object *obj,
991 struct lu_attr *attr,
992 struct thandle *handle,
998 rc = mdd_attr_check(env, obj, attr);
1003 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1007 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1008 struct mdd_object *obj,
1009 struct lu_attr *attr,
1010 struct thandle *handle,
1016 needacl = needacl && (attr->la_valid & LA_MODE);
1018 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1019 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1021 mdd_write_unlock(env, obj);
1025 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1026 struct mdd_object *obj,
1027 struct lu_attr *attr,
1028 struct thandle *handle,
1034 needacl = needacl && (attr->la_valid & LA_MODE);
1036 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1037 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1039 mdd_write_unlock(env, obj);
1043 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1044 const struct lu_buf *buf, const char *name,
1045 int fl, struct thandle *handle)
1047 struct lustre_capa *capa = mdd_object_capa(env, obj);
1051 if (buf->lb_buf && buf->lb_len > 0)
1052 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1053 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1054 rc = mdo_xattr_del(env, obj, name, handle, capa);
1060 * This gives the same functionality as the code between
1061 * sys_chmod and inode_setattr
1062 * chown_common and inode_setattr
1063 * utimes and inode_setattr
1064 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1066 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1067 struct lu_attr *la, const struct md_attr *ma)
1069 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1070 struct md_ucred *uc;
1077 /* Do not permit change file type */
1078 if (la->la_valid & LA_TYPE)
1081 /* They should not be processed by setattr */
1082 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1085 /* export destroy does not have ->le_ses, but we may want
1086 * to drop LUSTRE_SOM_FL. */
1092 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1096 if (la->la_valid == LA_CTIME) {
1097 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1098 /* This is only for set ctime when rename's source is
1100 rc = mdd_may_delete(env, NULL, obj,
1101 (struct md_attr *)ma, 1, 0);
1102 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1103 la->la_valid &= ~LA_CTIME;
1107 if (la->la_valid == LA_ATIME) {
1108 /* This is atime only set for read atime update on close. */
1109 if (la->la_atime >= tmp_la->la_atime &&
1110 la->la_atime < (tmp_la->la_atime +
1111 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1112 la->la_valid &= ~LA_ATIME;
1116 /* Check if flags change. */
1117 if (la->la_valid & LA_FLAGS) {
1118 unsigned int oldflags = 0;
1119 unsigned int newflags = la->la_flags &
1120 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1122 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1123 !mdd_capable(uc, CFS_CAP_FOWNER))
1126 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1127 * only be changed by the relevant capability. */
1128 if (mdd_is_immutable(obj))
1129 oldflags |= LUSTRE_IMMUTABLE_FL;
1130 if (mdd_is_append(obj))
1131 oldflags |= LUSTRE_APPEND_FL;
1132 if ((oldflags ^ newflags) &&
1133 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1136 if (!S_ISDIR(tmp_la->la_mode))
1137 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1140 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1141 (la->la_valid & ~LA_FLAGS) &&
1142 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1145 /* Check for setting the obj time. */
1146 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1147 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1148 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1149 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1150 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1158 if (la->la_valid & LA_KILL_SUID) {
1159 la->la_valid &= ~LA_KILL_SUID;
1160 if ((tmp_la->la_mode & S_ISUID) &&
1161 !(la->la_valid & LA_MODE)) {
1162 la->la_mode = tmp_la->la_mode;
1163 la->la_valid |= LA_MODE;
1165 la->la_mode &= ~S_ISUID;
1168 if (la->la_valid & LA_KILL_SGID) {
1169 la->la_valid &= ~LA_KILL_SGID;
1170 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1171 (S_ISGID | S_IXGRP)) &&
1172 !(la->la_valid & LA_MODE)) {
1173 la->la_mode = tmp_la->la_mode;
1174 la->la_valid |= LA_MODE;
1176 la->la_mode &= ~S_ISGID;
1179 /* Make sure a caller can chmod. */
1180 if (la->la_valid & LA_MODE) {
1181 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1182 (uc->mu_fsuid != tmp_la->la_uid) &&
1183 !mdd_capable(uc, CFS_CAP_FOWNER))
1186 if (la->la_mode == (cfs_umode_t) -1)
1187 la->la_mode = tmp_la->la_mode;
1189 la->la_mode = (la->la_mode & S_IALLUGO) |
1190 (tmp_la->la_mode & ~S_IALLUGO);
1192 /* Also check the setgid bit! */
1193 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1194 la->la_gid : tmp_la->la_gid) &&
1195 !mdd_capable(uc, CFS_CAP_FSETID))
1196 la->la_mode &= ~S_ISGID;
1198 la->la_mode = tmp_la->la_mode;
1201 /* Make sure a caller can chown. */
1202 if (la->la_valid & LA_UID) {
1203 if (la->la_uid == (uid_t) -1)
1204 la->la_uid = tmp_la->la_uid;
1205 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1206 (la->la_uid != tmp_la->la_uid)) &&
1207 !mdd_capable(uc, CFS_CAP_CHOWN))
1210 /* If the user or group of a non-directory has been
1211 * changed by a non-root user, remove the setuid bit.
1212 * 19981026 David C Niemi <niemi@tux.org>
1214 * Changed this to apply to all users, including root,
1215 * to avoid some races. This is the behavior we had in
1216 * 2.0. The check for non-root was definitely wrong
1217 * for 2.2 anyway, as it should have been using
1218 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1219 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1220 !S_ISDIR(tmp_la->la_mode)) {
1221 la->la_mode &= ~S_ISUID;
1222 la->la_valid |= LA_MODE;
1226 /* Make sure caller can chgrp. */
1227 if (la->la_valid & LA_GID) {
1228 if (la->la_gid == (gid_t) -1)
1229 la->la_gid = tmp_la->la_gid;
1230 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1231 ((la->la_gid != tmp_la->la_gid) &&
1232 !lustre_in_group_p(uc, la->la_gid))) &&
1233 !mdd_capable(uc, CFS_CAP_CHOWN))
1236 /* Likewise, if the user or group of a non-directory
1237 * has been changed by a non-root user, remove the
1238 * setgid bit UNLESS there is no group execute bit
1239 * (this would be a file marked for mandatory
1240 * locking). 19981026 David C Niemi <niemi@tux.org>
1242 * Removed the fsuid check (see the comment above) --
1244 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1245 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1246 la->la_mode &= ~S_ISGID;
1247 la->la_valid |= LA_MODE;
1251 /* For both Size-on-MDS case and truncate case,
1252 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1253 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1254 * For SOM case, it is true, the MAY_WRITE perm has been checked
1255 * when open, no need check again. For truncate case, it is false,
1256 * the MAY_WRITE perm should be checked here. */
1257 if (ma->ma_attr_flags & MDS_SOM) {
1258 /* For the "Size-on-MDS" setattr update, merge coming
1259 * attributes with the set in the inode. BUG 10641 */
1260 if ((la->la_valid & LA_ATIME) &&
1261 (la->la_atime <= tmp_la->la_atime))
1262 la->la_valid &= ~LA_ATIME;
1264 /* OST attributes do not have a priority over MDS attributes,
1265 * so drop times if ctime is equal. */
1266 if ((la->la_valid & LA_CTIME) &&
1267 (la->la_ctime <= tmp_la->la_ctime))
1268 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1270 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1271 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1272 (uc->mu_fsuid == tmp_la->la_uid)) &&
1273 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1274 rc = mdd_permission_internal_locked(env, obj,
1281 if (la->la_valid & LA_CTIME) {
1282 /* The pure setattr, it has the priority over what is
1283 * already set, do not drop it if ctime is equal. */
1284 if (la->la_ctime < tmp_la->la_ctime)
1285 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1293 /** Store a data change changelog record
1294 * If this fails, we must fail the whole transaction; we don't
1295 * want the change to commit without the log entry.
1296 * \param mdd_obj - mdd_object of change
1297 * \param handle - transacion handle
1299 static int mdd_changelog_data_store(const struct lu_env *env,
1300 struct mdd_device *mdd,
1301 enum changelog_rec_type type,
1303 struct mdd_object *mdd_obj,
1304 struct thandle *handle)
1306 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1307 struct llog_changelog_rec *rec;
1308 struct thandle *th = NULL;
1314 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1316 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1319 LASSERT(mdd_obj != NULL);
1320 LASSERT(handle != NULL);
1322 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1323 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1324 /* Don't need multiple updates in this log */
1325 /* Don't check under lock - no big deal if we get an extra
1330 reclen = llog_data_len(sizeof(*rec));
1331 buf = mdd_buf_alloc(env, reclen);
1332 if (buf->lb_buf == NULL)
1334 rec = (struct llog_changelog_rec *)buf->lb_buf;
1336 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1337 rec->cr.cr_type = (__u32)type;
1338 rec->cr.cr_tfid = *tfid;
1339 rec->cr.cr_namelen = 0;
1340 mdd_obj->mod_cltime = cfs_time_current_64();
1342 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1345 mdd_trans_stop(env, mdd, rc, th);
1348 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1349 rc, type, PFID(tfid));
1356 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1357 int flags, struct md_object *obj)
1359 struct thandle *handle;
1360 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1361 struct mdd_device *mdd = mdo2mdd(obj);
1365 handle = mdd_trans_create(env, mdd);
1367 return(PTR_ERR(handle));
1369 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1373 rc = mdd_trans_start(env, mdd, handle);
1377 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1381 mdd_trans_stop(env, mdd, rc, handle);
1387 * Should be called with write lock held.
1389 * \see mdd_lma_set_locked().
1391 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1392 const struct md_attr *ma, struct thandle *handle)
1394 struct mdd_thread_info *info = mdd_env_info(env);
1396 struct lustre_mdt_attrs *lma =
1397 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1398 int lmasize = sizeof(struct lustre_mdt_attrs);
1403 /* Either HSM or SOM part is not valid, we need to read it before */
1404 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1405 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1409 lustre_lma_swab(lma);
1411 memset(lma, 0, lmasize);
1415 if (ma->ma_valid & MA_HSM) {
1416 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1417 lma->lma_compat |= LMAC_HSM;
1421 if (ma->ma_valid & MA_SOM) {
1422 LASSERT(ma->ma_som != NULL);
1423 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1424 lma->lma_compat &= ~LMAC_SOM;
1426 lma->lma_compat |= LMAC_SOM;
1427 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1428 lma->lma_som_size = ma->ma_som->msd_size;
1429 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1430 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1435 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1437 lustre_lma_swab(lma);
1438 buf = mdd_buf_get(env, lma, lmasize);
1439 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1445 * Save LMA extended attributes with data from \a ma.
1447 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1448 * not, LMA EA will be first read from disk, modified and write back.
1451 static int mdd_lma_set_locked(const struct lu_env *env,
1452 struct mdd_object *mdd_obj,
1453 const struct md_attr *ma, struct thandle *handle)
1457 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1458 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1459 mdd_write_unlock(env, mdd_obj);
1463 /* Precedence for choosing record type when multiple
1464 * attributes change: setattr > mtime > ctime > atime
1465 * (ctime changes when mtime does, plus chmod/chown.
1466 * atime and ctime are independent.) */
1467 static int mdd_attr_set_changelog(const struct lu_env *env,
1468 struct md_object *obj, struct thandle *handle,
1471 struct mdd_device *mdd = mdo2mdd(obj);
1474 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1475 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1476 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1477 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1478 bits = bits & mdd->mdd_cl.mc_mask;
1482 /* The record type is the lowest non-masked set bit */
1483 while (bits && ((bits & 1) == 0)) {
1488 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1489 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1490 md2mdd_obj(obj), handle);
1493 static int mdd_declare_attr_set(const struct lu_env *env,
1494 struct mdd_device *mdd,
1495 struct mdd_object *obj,
1496 const struct md_attr *ma,
1497 struct lov_mds_md *lmm,
1498 struct thandle *handle)
1500 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1503 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1507 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1511 if (ma->ma_valid & MA_LOV) {
1513 buf->lb_len = ma->ma_lmm_size;
1514 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1520 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1522 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1523 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1529 #ifdef CONFIG_FS_POSIX_ACL
1530 if (ma->ma_attr.la_valid & LA_MODE) {
1531 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1532 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1534 mdd_read_unlock(env, obj);
1535 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1543 rc = mdo_declare_xattr_set(env, obj, buf,
1544 XATTR_NAME_ACL_ACCESS, 0,
1552 /* basically the log is the same as in unlink case */
1556 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1557 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1558 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1559 mdd->mdd_obd_dev->obd_name,
1560 le32_to_cpu(lmm->lmm_magic),
1561 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1565 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1566 if (stripe == LOV_ALL_STRIPES) {
1567 struct lov_desc *ldesc;
1569 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1570 LASSERT(ldesc != NULL);
1571 stripe = ldesc->ld_tgt_count;
1574 for (i = 0; i < stripe; i++) {
1575 rc = mdd_declare_llog_record(env, mdd,
1576 sizeof(struct llog_unlink_rec),
1586 /* set attr and LOV EA at once, return updated attr */
1587 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1588 const struct md_attr *ma)
1590 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1591 struct mdd_device *mdd = mdo2mdd(obj);
1592 struct thandle *handle;
1593 struct lov_mds_md *lmm = NULL;
1594 struct llog_cookie *logcookies = NULL;
1595 int rc, lmm_size = 0, cookie_size = 0;
1596 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1597 struct obd_device *obd = mdd->mdd_obd_dev;
1598 struct mds_obd *mds = &obd->u.mds;
1599 #ifdef HAVE_QUOTA_SUPPORT
1600 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1601 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1602 int quota_opc = 0, block_count = 0;
1603 int inode_pending[MAXQUOTAS] = { 0, 0 };
1604 int block_pending[MAXQUOTAS] = { 0, 0 };
1608 *la_copy = ma->ma_attr;
1609 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1613 /* setattr on "close" only change atime, or do nothing */
1614 if (ma->ma_valid == MA_INODE &&
1615 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1618 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1619 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1620 lmm_size = mdd_lov_mdsize(env, mdd);
1621 lmm = mdd_max_lmm_get(env, mdd);
1625 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1632 handle = mdd_trans_create(env, mdd);
1634 RETURN(PTR_ERR(handle));
1636 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1637 lmm_size > 0 ? lmm : NULL, handle);
1641 /* permission changes may require sync operation */
1642 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1643 handle->th_sync = !!mdd->mdd_sync_permission;
1645 rc = mdd_trans_start(env, mdd, handle);
1649 /* permission changes may require sync operation */
1650 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1651 handle->th_sync |= mdd->mdd_sync_permission;
1653 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1654 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1655 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1657 #ifdef HAVE_QUOTA_SUPPORT
1658 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1659 struct obd_export *exp = md_quota(env)->mq_exp;
1660 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1662 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1664 quota_opc = FSFILT_OP_SETATTR;
1665 mdd_quota_wrapper(la_copy, qnids);
1666 mdd_quota_wrapper(la_tmp, qoids);
1667 /* get file quota for new owner */
1668 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1669 qnids, inode_pending, 1, NULL, 0,
1671 block_count = (la_tmp->la_blocks + 7) >> 3;
1674 mdd_data_get(env, mdd_obj, &data);
1675 /* get block quota for new owner */
1676 lquota_chkquota(mds_quota_interface_ref, obd,
1677 exp, qnids, block_pending,
1679 LQUOTA_FLAGS_BLK, data, 1);
1685 if (la_copy->la_valid & LA_FLAGS) {
1686 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1689 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1690 } else if (la_copy->la_valid) { /* setattr */
1691 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1693 /* journal chown/chgrp in llog, just like unlink */
1694 if (rc == 0 && lmm_size){
1695 cookie_size = mdd_lov_cookiesize(env, mdd);
1696 logcookies = mdd_max_cookie_get(env, mdd);
1697 if (logcookies == NULL)
1698 GOTO(cleanup, rc = -ENOMEM);
1700 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1701 logcookies, cookie_size) <= 0)
1706 if (rc == 0 && ma->ma_valid & MA_LOV) {
1709 mode = mdd_object_type(mdd_obj);
1710 if (S_ISREG(mode) || S_ISDIR(mode)) {
1711 rc = mdd_lsm_sanity_check(env, mdd_obj);
1715 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1716 ma->ma_lmm_size, handle, 1);
1720 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1723 mode = mdd_object_type(mdd_obj);
1725 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1730 rc = mdd_attr_set_changelog(env, obj, handle,
1731 ma->ma_attr.la_valid);
1733 mdd_trans_stop(env, mdd, rc, handle);
1734 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1735 /*set obd attr, if needed*/
1736 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1739 #ifdef HAVE_QUOTA_SUPPORT
1741 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1743 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1745 /* Trigger dqrel/dqacq for original owner and new owner.
1746 * If failed, the next call for lquota_chkquota will
1748 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1755 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1756 const struct lu_buf *buf, const char *name, int fl,
1757 struct thandle *handle)
1762 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1763 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1764 mdd_write_unlock(env, obj);
1769 static int mdd_xattr_sanity_check(const struct lu_env *env,
1770 struct mdd_object *obj)
1772 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1773 struct md_ucred *uc = md_ucred(env);
1777 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1780 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1784 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1785 !mdd_capable(uc, CFS_CAP_FOWNER))
1791 static int mdd_declare_xattr_set(const struct lu_env *env,
1792 struct mdd_device *mdd,
1793 struct mdd_object *obj,
1794 const struct lu_buf *buf,
1796 struct thandle *handle)
1801 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1805 /* Only record user xattr changes */
1806 if ((strncmp("user.", name, 5) == 0))
1807 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1813 * The caller should guarantee to update the object ctime
1814 * after xattr_set if needed.
1816 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1817 const struct lu_buf *buf, const char *name,
1820 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1821 struct mdd_device *mdd = mdo2mdd(obj);
1822 struct thandle *handle;
1826 rc = mdd_xattr_sanity_check(env, mdd_obj);
1830 handle = mdd_trans_create(env, mdd);
1832 RETURN(PTR_ERR(handle));
1834 /* security-replated changes may require sync */
1835 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1836 mdd->mdd_sync_permission == 1)
1837 handle->th_sync = 1;
1839 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1843 rc = mdd_trans_start(env, mdd, handle);
1847 /* security-replated changes may require sync */
1848 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1849 handle->th_sync |= mdd->mdd_sync_permission;
1851 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1853 /* Only record system & user xattr changes */
1854 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1855 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1856 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1857 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1858 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1859 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1860 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1864 mdd_trans_stop(env, mdd, rc, handle);
1869 static int mdd_declare_xattr_del(const struct lu_env *env,
1870 struct mdd_device *mdd,
1871 struct mdd_object *obj,
1873 struct thandle *handle)
1877 rc = mdo_declare_xattr_del(env, obj, name, handle);
1881 /* Only record user xattr changes */
1882 if ((strncmp("user.", name, 5) == 0))
1883 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1889 * The caller should guarantee to update the object ctime
1890 * after xattr_set if needed.
1892 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1895 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1896 struct mdd_device *mdd = mdo2mdd(obj);
1897 struct thandle *handle;
1901 rc = mdd_xattr_sanity_check(env, mdd_obj);
1905 handle = mdd_trans_create(env, mdd);
1907 RETURN(PTR_ERR(handle));
1909 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1913 rc = mdd_trans_start(env, mdd, handle);
1917 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1918 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1919 mdd_object_capa(env, mdd_obj));
1920 mdd_write_unlock(env, mdd_obj);
1922 /* Only record system & user xattr changes */
1923 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1924 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1925 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1926 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1927 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1928 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1929 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1933 mdd_trans_stop(env, mdd, rc, handle);
1938 /* partial unlink */
1939 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1942 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1943 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1944 struct mdd_device *mdd = mdo2mdd(obj);
1945 struct thandle *handle;
1946 #ifdef HAVE_QUOTA_SUPPORT
1947 struct obd_device *obd = mdd->mdd_obd_dev;
1948 struct mds_obd *mds = &obd->u.mds;
1949 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1955 /* XXX: this code won't be used ever:
1956 * DNE uses slightly different approach */
1960 * Check -ENOENT early here because we need to get object type
1961 * to calculate credits before transaction start
1963 if (mdd_object_exists(mdd_obj) == 0) {
1964 CERROR("%s: object "DFID" not found: rc = -2\n",
1965 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
1969 LASSERT(mdd_object_exists(mdd_obj) > 0);
1971 handle = mdd_trans_create(env, mdd);
1975 rc = mdd_trans_start(env, mdd, handle);
1977 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1979 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1983 mdo_ref_del(env, mdd_obj, handle);
1985 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1987 mdo_ref_del(env, mdd_obj, handle);
1990 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1991 la_copy->la_ctime = ma->ma_attr.la_ctime;
1993 la_copy->la_valid = LA_CTIME;
1994 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1998 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1999 #ifdef HAVE_QUOTA_SUPPORT
2000 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2001 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2002 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2003 mdd_quota_wrapper(&ma->ma_attr, qids);
2010 mdd_write_unlock(env, mdd_obj);
2011 mdd_trans_stop(env, mdd, rc, handle);
2012 #ifdef HAVE_QUOTA_SUPPORT
2014 /* Trigger dqrel on the owner of child. If failed,
2015 * the next call for lquota_chkquota will process it */
2016 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2022 /* partial operation */
2023 static int mdd_oc_sanity_check(const struct lu_env *env,
2024 struct mdd_object *obj,
2030 switch (ma->ma_attr.la_mode & S_IFMT) {
2047 static int mdd_object_create(const struct lu_env *env,
2048 struct md_object *obj,
2049 const struct md_op_spec *spec,
2053 struct mdd_device *mdd = mdo2mdd(obj);
2054 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2055 const struct lu_fid *pfid = spec->u.sp_pfid;
2056 struct thandle *handle;
2057 #ifdef HAVE_QUOTA_SUPPORT
2058 struct obd_device *obd = mdd->mdd_obd_dev;
2059 struct obd_export *exp = md_quota(env)->mq_exp;
2060 struct mds_obd *mds = &obd->u.mds;
2061 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2062 int quota_opc = 0, block_count = 0;
2063 int inode_pending[MAXQUOTAS] = { 0, 0 };
2064 int block_pending[MAXQUOTAS] = { 0, 0 };
2069 /* XXX: this code won't be used ever:
2070 * DNE uses slightly different approach */
2073 #ifdef HAVE_QUOTA_SUPPORT
2074 if (mds->mds_quota) {
2075 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2076 mdd_quota_wrapper(&ma->ma_attr, qids);
2077 /* get file quota for child */
2078 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2079 qids, inode_pending, 1, NULL, 0,
2081 switch (ma->ma_attr.la_mode & S_IFMT) {
2090 /* get block quota for child */
2092 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2093 qids, block_pending, block_count,
2094 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2098 handle = mdd_trans_create(env, mdd);
2100 GOTO(out_pending, rc = PTR_ERR(handle));
2102 rc = mdd_trans_start(env, mdd, handle);
2104 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2105 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2109 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2113 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2114 /* If creating the slave object, set slave EA here. */
2115 int lmv_size = spec->u.sp_ea.eadatalen;
2116 struct lmv_stripe_md *lmv;
2118 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2119 LASSERT(lmv != NULL && lmv_size > 0);
2121 rc = __mdd_xattr_set(env, mdd_obj,
2122 mdd_buf_get_const(env, lmv, lmv_size),
2123 XATTR_NAME_LMV, 0, handle);
2127 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2130 #ifdef CONFIG_FS_POSIX_ACL
2131 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2132 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2134 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2135 buf->lb_len = spec->u.sp_ea.eadatalen;
2136 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2137 rc = __mdd_acl_init(env, mdd_obj, buf,
2138 &ma->ma_attr.la_mode,
2143 ma->ma_attr.la_valid |= LA_MODE;
2146 pfid = spec->u.sp_ea.fid;
2149 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2155 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2156 mdd_write_unlock(env, mdd_obj);
2158 mdd_trans_stop(env, mdd, rc, handle);
2160 #ifdef HAVE_QUOTA_SUPPORT
2162 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2164 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2166 /* Trigger dqacq on the owner of child. If failed,
2167 * the next call for lquota_chkquota will process it. */
2168 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2176 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2177 const struct md_attr *ma)
2179 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2180 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2181 struct mdd_device *mdd = mdo2mdd(obj);
2182 struct thandle *handle;
2186 /* XXX: this code won't be used ever:
2187 * DNE uses slightly different approach */
2190 handle = mdd_trans_create(env, mdd);
2194 rc = mdd_trans_start(env, mdd, handle);
2196 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2197 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2199 mdo_ref_add(env, mdd_obj, handle);
2200 mdd_write_unlock(env, mdd_obj);
2202 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2203 la_copy->la_ctime = ma->ma_attr.la_ctime;
2205 la_copy->la_valid = LA_CTIME;
2206 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2209 mdd_trans_stop(env, mdd, 0, handle);
2215 * do NOT or the MAY_*'s, you'll get the weakest
2217 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2221 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2222 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2223 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2224 * owner can write to a file even if it is marked readonly to hide
2225 * its brokenness. (bug 5781) */
2226 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2227 struct md_ucred *uc = md_ucred(env);
2229 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2230 (la->la_uid == uc->mu_fsuid))
2234 if (flags & FMODE_READ)
2236 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2238 if (flags & MDS_FMODE_EXEC)
2243 static int mdd_open_sanity_check(const struct lu_env *env,
2244 struct mdd_object *obj, int flag)
2246 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2251 if (mdd_is_dead_obj(obj))
2254 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2258 if (S_ISLNK(tmp_la->la_mode))
2261 mode = accmode(env, tmp_la, flag);
2263 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2266 if (!(flag & MDS_OPEN_CREATED)) {
2267 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2272 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2273 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2274 flag &= ~MDS_OPEN_TRUNC;
2276 /* For writing append-only file must open it with append mode. */
2277 if (mdd_is_append(obj)) {
2278 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2280 if (flag & MDS_OPEN_TRUNC)
2286 * Now, flag -- O_NOATIME does not be packed by client.
2288 if (flag & O_NOATIME) {
2289 struct md_ucred *uc = md_ucred(env);
2291 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2292 (uc->mu_valid == UCRED_NEW)) &&
2293 (uc->mu_fsuid != tmp_la->la_uid) &&
2294 !mdd_capable(uc, CFS_CAP_FOWNER))
2302 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2305 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2308 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2310 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2312 mdd_obj->mod_count++;
2314 mdd_write_unlock(env, mdd_obj);
2318 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2319 struct md_attr *ma, struct thandle *handle)
2323 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2327 return mdo_declare_destroy(env, obj, handle);
2330 /* return md_attr back,
2331 * if it is last unlink then return lov ea + llog cookie*/
2332 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2333 struct md_attr *ma, struct thandle *handle)
2338 if (S_ISREG(mdd_object_type(obj))) {
2339 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2340 * Caller must be ready for that. */
2342 rc = __mdd_lmm_get(env, obj, ma);
2343 if ((ma->ma_valid & MA_LOV))
2344 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2349 rc = mdo_destroy(env, obj, handle);
2354 static int mdd_declare_close(const struct lu_env *env,
2355 struct mdd_object *obj,
2357 struct thandle *handle)
2361 rc = orph_declare_index_delete(env, obj, handle);
2365 return mdd_declare_object_kill(env, obj, ma, handle);
2369 * No permission check is needed.
2371 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2372 struct md_attr *ma, int mode)
2374 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2375 struct mdd_device *mdd = mdo2mdd(obj);
2376 struct thandle *handle = NULL;
2378 int is_orphan = 0, reset = 1;
2380 #ifdef HAVE_QUOTA_SUPPORT
2381 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2382 struct mds_obd *mds = &obd->u.mds;
2383 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2388 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2389 mdd_obj->mod_count--;
2391 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2392 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2393 "list\n", PFID(mdd_object_fid(mdd_obj)));
2397 /* check without any lock */
2398 if (mdd_obj->mod_count == 1 &&
2399 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2401 handle = mdd_trans_create(env, mdo2mdd(obj));
2403 RETURN(PTR_ERR(handle));
2405 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2409 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2413 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2418 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2419 if (handle == NULL && mdd_obj->mod_count == 1 &&
2420 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2421 mdd_write_unlock(env, mdd_obj);
2425 /* release open count */
2426 mdd_obj->mod_count --;
2428 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2429 /* remove link to object from orphan index */
2430 LASSERT(handle != NULL);
2431 rc = __mdd_orphan_del(env, mdd_obj, handle);
2433 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2434 "list, OSS objects to be destroyed.\n",
2435 PFID(mdd_object_fid(mdd_obj)));
2438 CERROR("Object "DFID" can not be deleted from orphan "
2439 "list, maybe cause OST objects can not be "
2440 "destroyed (err: %d).\n",
2441 PFID(mdd_object_fid(mdd_obj)), rc);
2442 /* If object was not deleted from orphan list, do not
2443 * destroy OSS objects, which will be done when next
2449 rc = mdd_iattr_get(env, mdd_obj, ma);
2450 /* Object maybe not in orphan list originally, it is rare case for
2451 * mdd_finish_unlink() failure. */
2452 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2453 #ifdef HAVE_QUOTA_SUPPORT
2454 if (mds->mds_quota) {
2455 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2456 mdd_quota_wrapper(&ma->ma_attr, qids);
2459 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2460 if (ma->ma_valid & MA_FLAGS &&
2461 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2462 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2464 if (handle == NULL) {
2465 handle = mdd_trans_create(env, mdo2mdd(obj));
2467 GOTO(out, rc = PTR_ERR(handle));
2469 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2474 rc = mdd_declare_changelog_store(env, mdd,
2479 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2484 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2490 CERROR("Error when prepare to delete Object "DFID" , "
2491 "which will cause OST objects can not be "
2492 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2498 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2500 mdd_write_unlock(env, mdd_obj);
2503 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2504 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2505 if (handle == NULL) {
2506 handle = mdd_trans_create(env, mdo2mdd(obj));
2508 GOTO(stop, rc = IS_ERR(handle));
2510 rc = mdd_declare_changelog_store(env, mdd, NULL,
2515 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2520 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2526 mdd_trans_stop(env, mdd, rc, handle);
2527 #ifdef HAVE_QUOTA_SUPPORT
2529 /* Trigger dqrel on the owner of child. If failed,
2530 * the next call for lquota_chkquota will process it */
2531 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2538 * Permission check is done when open,
2539 * no need check again.
2541 static int mdd_readpage_sanity_check(const struct lu_env *env,
2542 struct mdd_object *obj)
2544 struct dt_object *next = mdd_object_child(obj);
2548 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2556 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2557 struct lu_dirpage *dp, int nob,
2558 const struct dt_it_ops *iops, struct dt_it *it,
2564 struct lu_dirent *ent;
2565 struct lu_dirent *last = NULL;
2568 memset(area, 0, sizeof (*dp));
2569 area += sizeof (*dp);
2570 nob -= sizeof (*dp);
2577 len = iops->key_size(env, it);
2579 /* IAM iterator can return record with zero len. */
2583 hash = iops->store(env, it);
2584 if (unlikely(first)) {
2586 dp->ldp_hash_start = cpu_to_le64(hash);
2589 /* calculate max space required for lu_dirent */
2590 recsize = lu_dirent_calc_size(len, attr);
2592 if (nob >= recsize) {
2593 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2594 if (result == -ESTALE)
2599 /* osd might not able to pack all attributes,
2600 * so recheck rec length */
2601 recsize = le16_to_cpu(ent->lde_reclen);
2603 result = (last != NULL) ? 0 :-EINVAL;
2607 ent = (void *)ent + recsize;
2611 result = iops->next(env, it);
2612 if (result == -ESTALE)
2614 } while (result == 0);
2617 dp->ldp_hash_end = cpu_to_le64(hash);
2619 if (last->lde_hash == dp->ldp_hash_end)
2620 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2621 last->lde_reclen = 0; /* end mark */
2626 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2627 const struct lu_rdpg *rdpg)
2630 struct dt_object *next = mdd_object_child(obj);
2631 const struct dt_it_ops *iops;
2633 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2639 LASSERT(rdpg->rp_pages != NULL);
2640 LASSERT(next->do_index_ops != NULL);
2642 if (rdpg->rp_count <= 0)
2646 * iterate through directory and fill pages from @rdpg
2648 iops = &next->do_index_ops->dio_it;
2649 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2653 rc = iops->load(env, it, rdpg->rp_hash);
2657 * Iterator didn't find record with exactly the key requested.
2659 * It is currently either
2661 * - positioned above record with key less than
2662 * requested---skip it.
2664 * - or not positioned at all (is in IAM_IT_SKEWED
2665 * state)---position it on the next item.
2667 rc = iops->next(env, it);
2672 * At this point and across for-loop:
2674 * rc == 0 -> ok, proceed.
2675 * rc > 0 -> end of directory.
2678 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2679 i++, nob -= CFS_PAGE_SIZE) {
2680 struct lu_dirpage *dp;
2682 LASSERT(i < rdpg->rp_npages);
2683 pg = rdpg->rp_pages[i];
2685 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2688 rc = mdd_dir_page_build(env, mdd, dp,
2689 min_t(int, nob, LU_PAGE_SIZE),
2690 iops, it, rdpg->rp_attrs);
2695 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2697 } else if (rc < 0) {
2698 CWARN("build page failed: %d!\n", rc);
2701 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2702 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2703 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2710 struct lu_dirpage *dp;
2712 dp = cfs_kmap(rdpg->rp_pages[0]);
2713 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2716 * No pages were processed, mark this for first page
2719 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2722 cfs_kunmap(rdpg->rp_pages[0]);
2724 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2727 iops->fini(env, it);
2732 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2733 const struct lu_rdpg *rdpg)
2735 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2739 if (mdd_object_exists(mdd_obj) == 0) {
2740 CERROR("%s: object "DFID" not found: rc = -2\n",
2741 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2745 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2746 rc = mdd_readpage_sanity_check(env, mdd_obj);
2748 GOTO(out_unlock, rc);
2750 if (mdd_is_dead_obj(mdd_obj)) {
2752 struct lu_dirpage *dp;
2755 * According to POSIX, please do not return any entry to client:
2756 * even dot and dotdot should not be returned.
2758 CWARN("readdir from dead object: "DFID"\n",
2759 PFID(mdd_object_fid(mdd_obj)));
2761 if (rdpg->rp_count <= 0)
2762 GOTO(out_unlock, rc = -EFAULT);
2763 LASSERT(rdpg->rp_pages != NULL);
2765 pg = rdpg->rp_pages[0];
2766 dp = (struct lu_dirpage*)cfs_kmap(pg);
2767 memset(dp, 0 , sizeof(struct lu_dirpage));
2768 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2769 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2770 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2772 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2775 rc = __mdd_readpage(env, mdd_obj, rdpg);
2779 mdd_read_unlock(env, mdd_obj);
2783 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2785 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2786 struct dt_object *next;
2788 if (mdd_object_exists(mdd_obj) == 0) {
2789 CERROR("%s: object "DFID" not found: rc = -2\n",
2790 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2793 next = mdd_object_child(mdd_obj);
2794 return next->do_ops->do_object_sync(env, next);
2797 const struct md_object_operations mdd_obj_ops = {
2798 .moo_permission = mdd_permission,
2799 .moo_attr_get = mdd_attr_get,
2800 .moo_attr_set = mdd_attr_set,
2801 .moo_xattr_get = mdd_xattr_get,
2802 .moo_xattr_set = mdd_xattr_set,
2803 .moo_xattr_list = mdd_xattr_list,
2804 .moo_xattr_del = mdd_xattr_del,
2805 .moo_object_create = mdd_object_create,
2806 .moo_ref_add = mdd_ref_add,
2807 .moo_ref_del = mdd_ref_del,
2808 .moo_open = mdd_open,
2809 .moo_close = mdd_close,
2810 .moo_readpage = mdd_readpage,
2811 .moo_readlink = mdd_readlink,
2812 .moo_changelog = mdd_changelog,
2813 .moo_capa_get = mdd_capa_get,
2814 .moo_object_sync = mdd_object_sync,
2815 .moo_path = mdd_path,
2816 .moo_file_lock = mdd_file_lock,
2817 .moo_file_unlock = mdd_file_unlock,